#
tokens: 49545/50000 10/616 files (page 12/20)
lines: off (toggle) GitHub
raw markdown copy
This is page 12 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── bump-version.yml
│       ├── ci-lume.yml
│       ├── docker-publish-cua-linux.yml
│       ├── docker-publish-cua-windows.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── link-check.yml
│       ├── lint.yml
│       ├── npm-publish-cli.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       ├── python-tests.yml
│       ├── test-cua-models.yml
│       └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│   ├── docs.code-workspace
│   ├── extensions.json
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── cloud-windows-ga-macos-preview.md
│   ├── composite-agents.md
│   ├── computer-use-agents-for-growth-hacking.md
│   ├── cua-hackathon.md
│   ├── cua-playground-preview.md
│   ├── cua-vlm-router.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cli.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── neurips-2025-cua-papers.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .env.example
│   ├── .gitignore
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── observability.mdx
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── cua-vlm-router.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   ├── telemetry.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── cli-playbook
│   │       │   ├── commands.mdx
│   │       │   ├── index.mdx
│   │       │   └── meta.json
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── meta.json
│   │       │   ├── sandboxed-python.mdx
│   │       │   └── tracing-api.mdx
│   │       ├── example-usecases
│   │       │   ├── form-filling.mdx
│   │       │   ├── gemini-complex-ui-navigation.mdx
│   │       │   ├── meta.json
│   │       │   ├── post-event-contact-export.mdx
│   │       │   └── windows-app-behind-vpn.mdx
│   │       ├── get-started
│   │       │   ├── meta.json
│   │       │   └── quickstart.mdx
│   │       ├── index.mdx
│   │       ├── macos-vm-cli-playbook
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   └── meta.json
│   │       └── meta.json
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── bg-dark.jpg
│   │       ├── bg-light.jpg
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── grounding-with-gemini3.gif
│   │       ├── hero.png
│   │       ├── laminar_trace_example.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   ├── posthog
│   │   │   │   │   └── [...path]
│   │   │   │   │       └── route.ts
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   ├── llms.txt
│   │   │   │   └── route.ts
│   │   │   ├── robots.ts
│   │   │   └── sitemap.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── analytics-tracker.tsx
│   │   │   ├── cookie-consent.tsx
│   │   │   ├── doc-actions-menu.tsx
│   │   │   ├── editable-code-block.tsx
│   │   │   ├── footer.tsx
│   │   │   ├── hero.tsx
│   │   │   ├── iou.tsx
│   │   │   ├── mermaid.tsx
│   │   │   └── page-feedback.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   ├── mdx-components.tsx
│   │   └── providers
│   │       └── posthog-provider.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── browser_tool_example.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── tracing_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── cua_adapter.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gelato.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── generic_vlm.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   ├── uiins.py
│   │   │   │   │   ├── uitars.py
│   │   │   │   │   └── uitars2.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── tools
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── browser_tool.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer_agent.py
│   │   ├── bench-ui
│   │   │   ├── bench_ui
│   │   │   │   ├── __init__.py
│   │   │   │   ├── api.py
│   │   │   │   └── child.py
│   │   │   ├── examples
│   │   │   │   ├── folder_example.py
│   │   │   │   ├── gui
│   │   │   │   │   ├── index.html
│   │   │   │   │   ├── logo.svg
│   │   │   │   │   └── styles.css
│   │   │   │   ├── output_overlay.png
│   │   │   │   └── simple_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       └── test_port_detection.py
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── tracing_wrapper.py
│   │   │   │   ├── tracing.py
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer.py
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── browser.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   ├── utils
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── wallpaper.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   ├── test_connection.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_server.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_telemetry.py
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── build-extension.py
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── desktop-extension
│   │   │   │   ├── cua-extension.mcpb
│   │   │   │   ├── desktop_extension.png
│   │   │   │   ├── manifest.json
│   │   │   │   ├── README.md
│   │   │   │   ├── requirements.txt
│   │   │   │   ├── run_server.sh
│   │   │   │   └── setup.py
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── QUICK_TEST_COMMANDS.sh
│   │   │   ├── quick_test_local_option.py
│   │   │   ├── README.md
│   │   │   ├── scripts
│   │   │   │   ├── install_mcp_server.sh
│   │   │   │   └── start_mcp_server.sh
│   │   │   ├── test_mcp_server_local_option.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_mcp_server.py
│   │   ├── pylume
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_pylume.py
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           ├── conftest.py
│   │           └── test_omniparser.py
│   ├── qemu-docker
│   │   ├── linux
│   │   │   ├── Dockerfile
│   │   │   ├── README.md
│   │   │   └── src
│   │   │       ├── entry.sh
│   │   │       └── vm
│   │   │           ├── image
│   │   │           │   └── README.md
│   │   │           └── setup
│   │   │               ├── install.sh
│   │   │               ├── setup-cua-server.sh
│   │   │               └── setup.sh
│   │   ├── README.md
│   │   └── windows
│   │       ├── Dockerfile
│   │       ├── README.md
│   │       └── src
│   │           ├── entry.sh
│   │           └── vm
│   │               ├── image
│   │               │   └── README.md
│   │               └── setup
│   │                   ├── install.bat
│   │                   ├── on-logon.ps1
│   │                   ├── setup-cua-server.ps1
│   │                   ├── setup-utils.psm1
│   │                   └── setup.ps1
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── cua-cli
│   │   │   ├── .gitignore
│   │   │   ├── .prettierrc
│   │   │   ├── bun.lock
│   │   │   ├── CLAUDE.md
│   │   │   ├── index.ts
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── auth.ts
│   │   │   │   ├── cli.ts
│   │   │   │   ├── commands
│   │   │   │   │   ├── auth.ts
│   │   │   │   │   └── sandbox.ts
│   │   │   │   ├── config.ts
│   │   │   │   ├── http.ts
│   │   │   │   ├── storage.ts
│   │   │   │   └── util.ts
│   │   │   └── tsconfig.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Development.md
│       ├── Dockerfile
│       ├── Dockerfile.dev
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── install-cli.ps1
│   ├── install-cli.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   ├── run-docker-dev.sh
│   └── typescript-typecheck.js
├── TESTING.md
├── tests
│   ├── agent_loop_testing
│   │   ├── agent_test.py
│   │   └── README.md
│   ├── pytest.ini
│   ├── shell_cmd.py
│   ├── test_files.py
│   ├── test_mcp_server_session_management.py
│   ├── test_mcp_server_streaming.py
│   ├── test_shell_bash.py
│   ├── test_telemetry.py
│   ├── test_tracing.py
│   ├── test_venv.py
│   └── test_watchdog.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/test_files.py:
--------------------------------------------------------------------------------

```python
"""
File System Interface Tests
Tests for the file system methods of the Computer interface (macOS).
Required environment variables:
- CUA_API_KEY: API key for Cua cloud provider
- CUA_CONTAINER_NAME: Name of the container to use
"""

import asyncio
import os
import sys
import traceback
from pathlib import Path

import pytest

# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv

load_dotenv(env_file)

# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
    if path and path not in sys.path:
        sys.path.insert(0, path)  # Insert at beginning to prioritize
        print(f"Added to sys.path: {path}")

from computer import Computer, VMProviderType


@pytest.fixture(scope="session")
async def computer():
    """Shared Computer instance for all test cases."""
    # Create a remote Linux computer with Cua
    computer = Computer(
        os_type="linux",
        api_key=os.getenv("CUA_API_KEY"),
        name=str(os.getenv("CUA_CONTAINER_NAME")),
        provider_type=VMProviderType.CLOUD,
    )

    # Create a local macOS computer with Cua
    # computer = Computer()

    # Connect to host computer
    # computer = Computer(use_host_computer_server=True)

    try:
        await computer.run()
        yield computer
    finally:
        await computer.disconnect()


@pytest.mark.asyncio(loop_scope="session")
async def test_file_exists(computer):
    tmp_path = "test_file_exists.txt"
    # Ensure file does not exist
    if await computer.interface.file_exists(tmp_path):
        await computer.interface.delete_file(tmp_path)
    exists = await computer.interface.file_exists(tmp_path)
    assert exists is False, f"File {tmp_path} should not exist"
    # Create file and check again
    await computer.interface.write_text(tmp_path, "hello")
    exists = await computer.interface.file_exists(tmp_path)
    assert exists is True, f"File {tmp_path} should exist"
    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_directory_exists(computer):
    tmp_dir = "test_directory_exists"
    if await computer.interface.directory_exists(tmp_dir):
        # Remove all files in directory before removing directory
        files = await computer.interface.list_dir(tmp_dir)
        for fname in files:
            await computer.interface.delete_file(f"{tmp_dir}/{fname}")
        # Remove the directory itself
        await computer.interface.delete_dir(tmp_dir)
    exists = await computer.interface.directory_exists(tmp_dir)
    assert exists is False, f"Directory {tmp_dir} should not exist"
    await computer.interface.create_dir(tmp_dir)
    exists = await computer.interface.directory_exists(tmp_dir)
    assert exists is True, f"Directory {tmp_dir} should exist"
    # Cleanup: remove files and directory
    files = await computer.interface.list_dir(tmp_dir)
    for fname in files:
        await computer.interface.delete_file(f"{tmp_dir}/{fname}")
    await computer.interface.delete_dir(tmp_dir)


@pytest.mark.asyncio(loop_scope="session")
async def test_list_dir(computer):
    tmp_dir = "test_list_dir"
    if not await computer.interface.directory_exists(tmp_dir):
        await computer.interface.create_dir(tmp_dir)
    files = ["foo.txt", "bar.txt"]
    for fname in files:
        await computer.interface.write_text(f"{tmp_dir}/{fname}", "hi")
    result = await computer.interface.list_dir(tmp_dir)
    assert set(result) >= set(files), f"Directory {tmp_dir} should contain files {files}"
    for fname in files:
        await computer.interface.delete_file(f"{tmp_dir}/{fname}")
    await computer.interface.delete_dir(tmp_dir)


@pytest.mark.asyncio(loop_scope="session")
async def test_read_write_text(computer):
    tmp_path = "test_rw_text.txt"
    content = "sample text"
    await computer.interface.write_text(tmp_path, content)
    read = await computer.interface.read_text(tmp_path)
    assert read == content, "File content should match"
    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_delete_file(computer):
    tmp_path = "test_delete_file.txt"
    await computer.interface.write_text(tmp_path, "bye")
    exists = await computer.interface.file_exists(tmp_path)
    assert exists is True, "File should exist"
    await computer.interface.delete_file(tmp_path)
    exists = await computer.interface.file_exists(tmp_path)
    assert exists is False, "File should not exist"


@pytest.mark.asyncio(loop_scope="session")
async def test_create_dir(computer):
    tmp_dir = "test_create_dir"
    if await computer.interface.directory_exists(tmp_dir):
        await computer.interface.delete_dir(tmp_dir)
    await computer.interface.create_dir(tmp_dir)
    exists = await computer.interface.directory_exists(tmp_dir)
    assert exists is True, "Directory should exist"
    await computer.interface.delete_dir(tmp_dir)


@pytest.mark.asyncio(loop_scope="session")
async def test_read_bytes_basic(computer):
    """Test basic read_bytes functionality."""
    tmp_path = "test_read_bytes.bin"
    test_data = b"Hello, World! This is binary data \x00\x01\x02\x03"

    # Write binary data using write_text (assuming it handles bytes)
    await computer.interface.write_text(tmp_path, test_data.decode("latin-1"))

    # Read all bytes
    read_data = await computer.interface.read_bytes(tmp_path)
    assert read_data == test_data, "Binary data should match"

    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_read_bytes_with_offset_and_length(computer):
    """Test read_bytes with offset and length parameters."""
    tmp_path = "test_read_bytes_offset.bin"
    test_data = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"

    # Write test data
    await computer.interface.write_text(tmp_path, test_data.decode("latin-1"))

    # Test reading with offset only
    read_data = await computer.interface.read_bytes(tmp_path, offset=5)
    expected = test_data[5:]
    assert (
        read_data == expected
    ), f"Data from offset 5 should match. Got: {read_data}, Expected: {expected}"

    # Test reading with offset and length
    read_data = await computer.interface.read_bytes(tmp_path, offset=10, length=5)
    expected = test_data[10:15]
    assert (
        read_data == expected
    ), f"Data from offset 10, length 5 should match. Got: {read_data}, Expected: {expected}"

    # Test reading from beginning with length
    read_data = await computer.interface.read_bytes(tmp_path, offset=0, length=10)
    expected = test_data[:10]
    assert (
        read_data == expected
    ), f"Data from beginning, length 10 should match. Got: {read_data}, Expected: {expected}"

    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_get_file_size(computer):
    """Test get_file_size functionality."""
    tmp_path = "test_file_size.txt"
    test_content = "A" * 1000  # 1000 bytes

    await computer.interface.write_text(tmp_path, test_content)

    file_size = await computer.interface.get_file_size(tmp_path)
    assert file_size == 1000, f"File size should be 1000 bytes, got {file_size}"

    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_read_large_file(computer):
    """Test reading a file larger than 10MB to verify chunked reading."""
    tmp_path = "test_large_file.bin"

    # Create a file larger than 10MB (10 * 1024 * 1024 = 10,485,760 bytes)
    total_size = 12 * 1024 * 1024  # 12MB

    print(f"Creating large file of {total_size} bytes ({total_size / (1024*1024):.1f}MB)...")

    # Create large file content (this will test the chunked writing functionality)
    large_content = b"X" * total_size

    # Write the large file using write_bytes (will automatically use chunked writing)
    await computer.interface.write_bytes(tmp_path, large_content)

    # Verify file size
    file_size = await computer.interface.get_file_size(tmp_path)
    assert file_size == total_size, f"Large file size should be {total_size} bytes, got {file_size}"

    print(f"Large file created successfully: {file_size} bytes")

    # Test reading the entire large file (should use chunked reading)
    print("Reading large file...")
    read_data = await computer.interface.read_bytes(tmp_path)
    assert (
        len(read_data) == total_size
    ), f"Read data size should match file size. Got {len(read_data)}, expected {total_size}"

    # Verify content (should be all 'X' characters)
    expected_data = b"X" * total_size
    assert read_data == expected_data, "Large file content should be all 'X' characters"

    print("Large file read successfully!")

    # Test reading with offset and length on large file
    offset = 5 * 1024 * 1024  # 5MB offset
    length = 2 * 1024 * 1024  # 2MB length
    read_data = await computer.interface.read_bytes(tmp_path, offset=offset, length=length)
    assert len(read_data) == length, f"Partial read size should be {length}, got {len(read_data)}"
    assert read_data == b"X" * length, "Partial read content should be all 'X' characters"

    print("Large file partial read successful!")

    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Large file test completed successfully!")


@pytest.mark.asyncio(loop_scope="session")
async def test_read_write_text_with_encoding(computer):
    """Test reading and writing text files with different encodings."""
    print("Testing text file operations with different encodings...")

    tmp_path = "test_encoding.txt"

    # Test UTF-8 encoding (default)
    utf8_content = "Hello, 世界! 🌍 Ñoño café"
    await computer.interface.write_text(tmp_path, utf8_content, encoding="utf-8")
    read_utf8 = await computer.interface.read_text(tmp_path, encoding="utf-8")
    assert read_utf8 == utf8_content, "UTF-8 content should match"

    # Test ASCII encoding
    ascii_content = "Hello, World! Simple ASCII text."
    await computer.interface.write_text(tmp_path, ascii_content, encoding="ascii")
    read_ascii = await computer.interface.read_text(tmp_path, encoding="ascii")
    assert read_ascii == ascii_content, "ASCII content should match"

    # Test Latin-1 encoding
    latin1_content = "Café, naïve, résumé"
    await computer.interface.write_text(tmp_path, latin1_content, encoding="latin-1")
    read_latin1 = await computer.interface.read_text(tmp_path, encoding="latin-1")
    assert read_latin1 == latin1_content, "Latin-1 content should match"

    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Text encoding test completed successfully!")


@pytest.mark.asyncio(loop_scope="session")
async def test_write_text_append_mode(computer):
    """Test appending text to files."""
    print("Testing text file append mode...")

    tmp_path = "test_append.txt"

    # Write initial content
    initial_content = "First line\n"
    await computer.interface.write_text(tmp_path, initial_content)

    # Append more content
    append_content = "Second line\n"
    await computer.interface.write_text(tmp_path, append_content, append=True)

    # Read and verify
    final_content = await computer.interface.read_text(tmp_path)
    expected_content = initial_content + append_content
    assert (
        final_content == expected_content
    ), f"Expected '{expected_content}', got '{final_content}'"

    # Append one more line
    third_content = "Third line\n"
    await computer.interface.write_text(tmp_path, third_content, append=True)

    # Read and verify final result
    final_content = await computer.interface.read_text(tmp_path)
    expected_content = initial_content + append_content + third_content
    assert (
        final_content == expected_content
    ), f"Expected '{expected_content}', got '{final_content}'"

    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Text append test completed successfully!")


@pytest.mark.asyncio(loop_scope="session")
async def test_large_text_file(computer):
    """Test reading and writing large text files (>5MB) to verify chunked operations."""
    print("Testing large text file operations...")

    tmp_path = "test_large_text.txt"

    # Create a large text content (approximately 6MB)
    # Each line is about 100 characters, so 60,000 lines ≈ 6MB
    line_template = "This is line {:06d} with some additional text to make it longer and reach about 100 chars.\n"
    large_content = ""
    num_lines = 60000

    print(f"Generating large text content with {num_lines} lines...")
    for i in range(num_lines):
        large_content += line_template.format(i)

    content_size_mb = len(large_content.encode("utf-8")) / (1024 * 1024)
    print(f"Generated text content size: {content_size_mb:.2f} MB")

    # Write the large text file
    print("Writing large text file...")
    await computer.interface.write_text(tmp_path, large_content)

    # Read the entire file back
    print("Reading large text file...")
    read_content = await computer.interface.read_text(tmp_path)

    # Verify content matches
    assert read_content == large_content, "Large text file content should match exactly"

    # Test partial reading by reading as bytes and decoding specific portions
    print("Testing partial text reading...")

    # Read first 1000 characters worth of bytes
    first_1000_chars = large_content[:1000]
    first_1000_bytes = first_1000_chars.encode("utf-8")
    read_bytes = await computer.interface.read_bytes(
        tmp_path, offset=0, length=len(first_1000_bytes)
    )
    decoded_partial = read_bytes.decode("utf-8")
    assert decoded_partial == first_1000_chars, "Partial text reading should match"

    # Test appending to large file
    print("Testing append to large text file...")
    append_text = "\n--- APPENDED CONTENT ---\nThis content was appended to the large file.\n"
    await computer.interface.write_text(tmp_path, append_text, append=True)

    # Read and verify appended content
    final_content = await computer.interface.read_text(tmp_path)
    expected_final = large_content + append_text
    assert final_content == expected_final, "Appended large text file should match"

    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Large text file test completed successfully!")


@pytest.mark.asyncio(loop_scope="session")
async def test_text_file_edge_cases(computer):
    """Test edge cases for text file operations."""
    print("Testing text file edge cases...")

    tmp_path = "test_edge_cases.txt"

    # Test empty file
    empty_content = ""
    await computer.interface.write_text(tmp_path, empty_content)
    read_empty = await computer.interface.read_text(tmp_path)
    assert read_empty == empty_content, "Empty file should return empty string"

    # Test file with only whitespace
    whitespace_content = "   \n\t\r\n   \n"
    await computer.interface.write_text(tmp_path, whitespace_content)
    read_whitespace = await computer.interface.read_text(tmp_path)
    assert read_whitespace == whitespace_content, "Whitespace content should be preserved"

    # Test file with special characters and newlines
    special_content = "Line 1\nLine 2\r\nLine 3\tTabbed\nSpecial: !@#$%^&*()\n"
    await computer.interface.write_text(tmp_path, special_content)
    read_special = await computer.interface.read_text(tmp_path)
    assert read_special == special_content, "Special characters should be preserved"

    # Test very long single line (no newlines)
    long_line = "A" * 10000  # 10KB single line
    await computer.interface.write_text(tmp_path, long_line)
    read_long_line = await computer.interface.read_text(tmp_path)
    assert read_long_line == long_line, "Long single line should be preserved"

    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Text file edge cases test completed successfully!")


if __name__ == "__main__":
    # Run tests directly
    pytest.main([__file__, "-v"])

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/handlers/generic.py:
--------------------------------------------------------------------------------

```python
"""
Generic handlers for all OSes.

Includes:
- DesktopHandler
- FileHandler

"""

import base64
import os
import platform
import subprocess
import webbrowser
from pathlib import Path
from typing import Any, Dict, Optional

from ..utils import wallpaper
from .base import BaseDesktopHandler, BaseFileHandler, BaseWindowHandler

try:
    import pywinctl as pwc
except Exception:  # pragma: no cover
    pwc = None  # type: ignore


def resolve_path(path: str) -> Path:
    """Resolve a path to its absolute path. Expand ~ to the user's home directory.

    Args:
        path: The file or directory path to resolve

    Returns:
        Path: The resolved absolute path
    """
    return Path(path).expanduser().resolve()


# ===== Cross-platform Desktop command handlers =====


class GenericDesktopHandler(BaseDesktopHandler):
    """
    Generic desktop handler providing desktop-related operations.

    Implements:
    - get_desktop_environment: detect current desktop environment
    - set_wallpaper: set desktop wallpaper path
    """

    async def get_desktop_environment(self) -> Dict[str, Any]:
        """
        Get the current desktop environment.

        Returns:
            Dict containing 'success' boolean and either 'environment' string or 'error' string
        """
        try:
            env = wallpaper.get_desktop_environment()
            return {"success": True, "environment": env}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def set_wallpaper(self, path: str) -> Dict[str, Any]:
        """
        Set the desktop wallpaper to the specified path.

        Args:
            path: The file path to set as wallpaper

        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            file_path = resolve_path(path)
            ok = wallpaper.set_wallpaper(str(file_path))
            return {"success": bool(ok)}
        except Exception as e:
            return {"success": False, "error": str(e)}


# ===== Cross-platform window control command handlers =====


class GenericWindowHandler(BaseWindowHandler):
    """
    Cross-platform window management using pywinctl where possible.
    """

    async def open(self, target: str) -> Dict[str, Any]:
        try:
            if target.startswith("http://") or target.startswith("https://"):
                ok = webbrowser.open(target)
                return {"success": bool(ok)}
            path = str(resolve_path(target))
            sys = platform.system().lower()
            if sys == "darwin":
                subprocess.Popen(["open", path])
            elif sys == "linux":
                subprocess.Popen(["xdg-open", path])
            elif sys == "windows":
                os.startfile(path)  # type: ignore[attr-defined]
            else:
                return {"success": False, "error": f"Unsupported OS: {sys}"}
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def launch(self, app: str, args: Optional[list[str]] = None) -> Dict[str, Any]:
        try:
            if args:
                proc = subprocess.Popen([app, *args])
            else:
                # allow shell command like "libreoffice --writer"
                proc = subprocess.Popen(app, shell=True)
            return {"success": True, "pid": proc.pid}
        except Exception as e:
            return {"success": False, "error": str(e)}

    def _get_window_by_id(self, window_id: int | str) -> Optional[Any]:
        if pwc is None:
            raise RuntimeError("pywinctl not available")
        # Find by native handle among Window objects; getAllWindowsDict keys are titles
        try:
            for w in pwc.getAllWindows():
                if str(w.getHandle()) == str(window_id):
                    return w
            return None
        except Exception:
            return None

    async def get_current_window_id(self) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            win = pwc.getActiveWindow()
            if not win:
                return {"success": False, "error": "No active window"}
            return {"success": True, "window_id": win.getHandle()}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def get_application_windows(self, app: str) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            wins = pwc.getWindowsWithTitle(app, condition=pwc.Re.CONTAINS, flags=pwc.Re.IGNORECASE)
            ids = [w.getHandle() for w in wins]
            return {"success": True, "windows": ids}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def get_window_name(self, window_id: int | str) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            w = self._get_window_by_id(window_id)
            if not w:
                return {"success": False, "error": "Window not found"}
            return {"success": True, "name": w.title}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def get_window_size(self, window_id: int | str) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            w = self._get_window_by_id(window_id)
            if not w:
                return {"success": False, "error": "Window not found"}
            width, height = w.size
            return {"success": True, "width": int(width), "height": int(height)}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def get_window_position(self, window_id: int | str) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            w = self._get_window_by_id(window_id)
            if not w:
                return {"success": False, "error": "Window not found"}
            x, y = w.position
            return {"success": True, "x": int(x), "y": int(y)}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def set_window_size(
        self, window_id: int | str, width: int, height: int
    ) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            w = self._get_window_by_id(window_id)
            if not w:
                return {"success": False, "error": "Window not found"}
            ok = w.resizeTo(int(width), int(height))
            return {"success": bool(ok)}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def set_window_position(self, window_id: int | str, x: int, y: int) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            w = self._get_window_by_id(window_id)
            if not w:
                return {"success": False, "error": "Window not found"}
            ok = w.moveTo(int(x), int(y))
            return {"success": bool(ok)}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def maximize_window(self, window_id: int | str) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            w = self._get_window_by_id(window_id)
            if not w:
                return {"success": False, "error": "Window not found"}
            ok = w.maximize()
            return {"success": bool(ok)}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def minimize_window(self, window_id: int | str) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            w = self._get_window_by_id(window_id)
            if not w:
                return {"success": False, "error": "Window not found"}
            ok = w.minimize()
            return {"success": bool(ok)}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def activate_window(self, window_id: int | str) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            w = self._get_window_by_id(window_id)
            if not w:
                return {"success": False, "error": "Window not found"}
            ok = w.activate()
            return {"success": bool(ok)}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def close_window(self, window_id: int | str) -> Dict[str, Any]:
        try:
            if pwc is None:
                return {"success": False, "error": "pywinctl not available"}
            w = self._get_window_by_id(window_id)
            if not w:
                return {"success": False, "error": "Window not found"}
            ok = w.close()
            return {"success": bool(ok)}
        except Exception as e:
            return {"success": False, "error": str(e)}


# ===== Cross-platform file system command handlers =====


class GenericFileHandler(BaseFileHandler):
    """
    Generic file handler that provides file system operations for all operating systems.

    This class implements the BaseFileHandler interface and provides methods for
    file and directory operations including reading, writing, creating, and deleting
    files and directories.
    """

    async def file_exists(self, path: str) -> Dict[str, Any]:
        """
        Check if a file exists at the specified path.

        Args:
            path: The file path to check

        Returns:
            Dict containing 'success' boolean and either 'exists' boolean or 'error' string
        """
        try:
            return {"success": True, "exists": resolve_path(path).is_file()}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def directory_exists(self, path: str) -> Dict[str, Any]:
        """
        Check if a directory exists at the specified path.

        Args:
            path: The directory path to check

        Returns:
            Dict containing 'success' boolean and either 'exists' boolean or 'error' string
        """
        try:
            return {"success": True, "exists": resolve_path(path).is_dir()}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def list_dir(self, path: str) -> Dict[str, Any]:
        """
        List all files and directories in the specified directory.

        Args:
            path: The directory path to list

        Returns:
            Dict containing 'success' boolean and either 'files' list of names or 'error' string
        """
        try:
            return {
                "success": True,
                "files": [
                    p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()
                ],
            }
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def read_text(self, path: str) -> Dict[str, Any]:
        """
        Read the contents of a text file.

        Args:
            path: The file path to read from

        Returns:
            Dict containing 'success' boolean and either 'content' string or 'error' string
        """
        try:
            return {"success": True, "content": resolve_path(path).read_text()}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def write_text(self, path: str, content: str) -> Dict[str, Any]:
        """
        Write text content to a file.

        Args:
            path: The file path to write to
            content: The text content to write

        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            resolve_path(path).write_text(content)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def write_bytes(
        self, path: str, content_b64: str, append: bool = False
    ) -> Dict[str, Any]:
        """
        Write binary content to a file from base64 encoded string.

        Args:
            path: The file path to write to
            content_b64: Base64 encoded binary content
            append: If True, append to existing file; if False, overwrite

        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            mode = "ab" if append else "wb"
            with open(resolve_path(path), mode) as f:
                f.write(base64.b64decode(content_b64))
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def read_bytes(
        self, path: str, offset: int = 0, length: Optional[int] = None
    ) -> Dict[str, Any]:
        """
        Read binary content from a file and return as base64 encoded string.

        Args:
            path: The file path to read from
            offset: Byte offset to start reading from
            length: Number of bytes to read; if None, read entire file from offset

        Returns:
            Dict containing 'success' boolean and either 'content_b64' string or 'error' string
        """
        try:
            file_path = resolve_path(path)
            with open(file_path, "rb") as f:
                if offset > 0:
                    f.seek(offset)

                if length is not None:
                    content = f.read(length)
                else:
                    content = f.read()

            return {"success": True, "content_b64": base64.b64encode(content).decode("utf-8")}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def get_file_size(self, path: str) -> Dict[str, Any]:
        """
        Get the size of a file in bytes.

        Args:
            path: The file path to get size for

        Returns:
            Dict containing 'success' boolean and either 'size' integer or 'error' string
        """
        try:
            file_path = resolve_path(path)
            size = file_path.stat().st_size
            return {"success": True, "size": size}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def delete_file(self, path: str) -> Dict[str, Any]:
        """
        Delete a file at the specified path.

        Args:
            path: The file path to delete

        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            resolve_path(path).unlink()
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def create_dir(self, path: str) -> Dict[str, Any]:
        """
        Create a directory at the specified path.

        Creates parent directories if they don't exist and doesn't raise an error
        if the directory already exists.

        Args:
            path: The directory path to create

        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            resolve_path(path).mkdir(parents=True, exist_ok=True)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def delete_dir(self, path: str) -> Dict[str, Any]:
        """
        Delete an empty directory at the specified path.

        Args:
            path: The directory path to delete

        Returns:
            Dict containing 'success' boolean and optionally 'error' string
        """
        try:
            resolve_path(path).rmdir()
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

```

--------------------------------------------------------------------------------
/docs/content/docs/example-usecases/post-event-contact-export.mdx:
--------------------------------------------------------------------------------

```markdown
---
title: Post-Event Contact Export
description: Run overnight contact extraction from LinkedIn, X, or other social platforms after networking events
---

import { Step, Steps } from 'fumadocs-ui/components/steps';
import { Tab, Tabs } from 'fumadocs-ui/components/tabs';

## Overview

After networking events, you need to export new connections from LinkedIn, X, or other platforms into your CRM. This automation handles it for you.

**The workflow**: Kick off the script after an event and let it run overnight. Wake up to a clean CSV ready for your CRM or email tool.

This example focuses on LinkedIn but works across platforms. It uses [Cua Computer](/computer-sdk/computers) to interact with web interfaces and [Agent Loops](/agent-sdk/agent-loops) to iterate through connections with conversation history.

### Why Cua is Perfect for This

**Cua's VMs save your session data**, bypassing bot detection entirely:

- **Log in once manually** through the VM browser
- **Session persists** - you appear as a regular user, not a bot
- **No captchas** - the platform treats automation like normal browsing
- **No login code** - script doesn't handle authentication
- **Run overnight** - kick off and forget

Traditional web scraping triggers anti-bot measures immediately. Cua's approach works across all platforms.

### What You Get

The script generates two files with your extracted connections:

**CSV Export** (`linkedin_connections_20250116_143022.csv`):

```csv
first,last,role,company,met_at,linkedin
John,Smith,Software Engineer,Acme Corp,Google Devfest Toronto,https://www.linkedin.com/in/johnsmith
Sarah,Johnson,Product Manager,Tech Inc,Google Devfest Toronto,https://www.linkedin.com/in/sarahjohnson
```

**Messaging Links** (`linkedin_messaging_links_20250116_143022.txt`):

```
LinkedIn Messaging Compose Links
================================================================================

1. https://www.linkedin.com/messaging/compose/?recipient=johnsmith
2. https://www.linkedin.com/messaging/compose/?recipient=sarahjohnson
```

---

<Steps>

<Step>

### Set Up Your Environment

First, install the required dependencies:

Create a `requirements.txt` file:

```text
cua-agent
cua-computer
python-dotenv>=1.0.0
```

Install the dependencies:

```bash
pip install -r requirements.txt
```

Create a `.env` file with your API keys:

```text
ANTHROPIC_API_KEY=your-anthropic-api-key # optional, BYOK. By default, this cookbook uses the CUA VLM Router
CUA_API_KEY=sk_cua-api01...
CUA_CONTAINER_NAME=m-linux-...
```

Finally, setup your VM. Refer to the [quickstart guide](https://cua.ai/docs/get-started/quickstart) on how to setup the computer environment. 
</Step>

<Step>

### Log Into LinkedIn Manually

**Important**: Before running the script, manually log into LinkedIn through your VM:

1. Access your VM through the Cua dashboard
2. Open a browser and navigate to LinkedIn
3. Log in with your credentials (handle any captchas manually)
4. Close the browser but leave the VM running
5. Your session is now saved and ready for automation!

This one-time manual login bypasses all bot detection.

</Step>

<Step>

### Configure and Create Your Script

Create a Python file (e.g., `contact_export.py`). You can customize:

```python
# Where you met these connections (automatically added to CSV)
MET_AT_REASON = "Google Devfest Toronto"

# Number of contacts to extract (in the main loop)
for contact_num in range(1, 21):  # Change 21 to extract more/fewer contacts
```

Select your environment:

<Tabs items={['Cloud Sandbox', 'Linux on Docker', 'macOS Sandbox', 'Windows Sandbox']}>
  <Tab value="Cloud Sandbox">

```python
import asyncio
import csv
import logging
import os
import signal
import traceback
from datetime import datetime

from agent import ComputerAgent
from computer import Computer, VMProviderType
from dotenv import load_dotenv

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration: Define where you met these connections
MET_AT_REASON = "Google Devfest Toronto"

def handle_sigint(sig, frame):
    print("\n\nExecution interrupted by user. Exiting gracefully...")
    exit(0)

def extract_public_id_from_linkedin_url(linkedin_url):
    """Extract public ID from LinkedIn profile URL."""
    if not linkedin_url:
        return None

    url = linkedin_url.split('?')[0].rstrip('/')

    if '/in/' in url:
        public_id = url.split('/in/')[-1]
        return public_id

    return None

def extract_contact_from_response(result_output):
    """
    Extract contact information from agent's response.
    Expects format:
    FIRST: value
    LAST: value
    ROLE: value
    COMPANY: value
    LINKEDIN: value
    """
    contact = {
        'first': '',
        'last': '',
        'role': '',
        'company': '',
        'met_at': MET_AT_REASON,
        'linkedin': ''
    }

    for item in result_output:
        if item.get("type") == "message":
            content = item.get("content", [])
            for content_part in content:
                text = content_part.get("text", "")
                if text:
                    for line in text.split('\n'):
                        line = line.strip()
                        line_upper = line.upper()

                        if line_upper.startswith("FIRST:"):
                            value = line[6:].strip()
                            if value and value.upper() != "N/A":
                                contact['first'] = value
                        elif line_upper.startswith("LAST:"):
                            value = line[5:].strip()
                            if value and value.upper() != "N/A":
                                contact['last'] = value
                        elif line_upper.startswith("ROLE:"):
                            value = line[5:].strip()
                            if value and value.upper() != "N/A":
                                contact['role'] = value
                        elif line_upper.startswith("COMPANY:"):
                            value = line[8:].strip()
                            if value and value.upper() != "N/A":
                                contact['company'] = value
                        elif line_upper.startswith("LINKEDIN:"):
                            value = line[9:].strip()
                            if value and value.upper() != "N/A":
                                contact['linkedin'] = value

    return contact

async def scrape_linkedin_connections():
    """Scrape LinkedIn connections and export to CSV."""

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    csv_filename = f"linkedin_connections_{timestamp}.csv"
    csv_path = os.path.join(os.getcwd(), csv_filename)

    # Initialize CSV file
    with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=['first', 'last', 'role', 'company', 'met_at', 'linkedin'])
        writer.writeheader()

    print(f"\n🚀 Starting LinkedIn connections scraper")
    print(f"📁 Output file: {csv_path}")
    print(f"📍 Met at: {MET_AT_REASON}")
    print("=" * 80)

    try:
        async with Computer(
            os_type="linux",
            provider_type=VMProviderType.CLOUD,
            name=os.environ["CUA_CONTAINER_NAME"],  # Your sandbox name
            api_key=os.environ["CUA_API_KEY"],
            verbosity=logging.INFO,
        ) as computer:

            agent = ComputerAgent(
                model="cua/anthropic/claude-sonnet-4.5",
                tools=[computer],
                only_n_most_recent_images=3,
                verbosity=logging.INFO,
                trajectory_dir="trajectories",
                use_prompt_caching=True,
                max_trajectory_budget=10.0,
            )

            history = []

            # Task 1: Navigate to LinkedIn connections page
            navigation_task = (
                "STEP 1 - NAVIGATE TO LINKEDIN CONNECTIONS PAGE:\n"
                "1. Open a web browser (Chrome or Firefox)\n"
                "2. Navigate to https://www.linkedin.com/mynetwork/invite-connect/connections/\n"
                "3. Wait for the page to fully load\n"
                "4. Confirm you can see the list of connections\n"
                "5. Ready to start extracting contacts"
            )

            print(f"\n[Task 1/21] Navigating to LinkedIn...")
            history.append({"role": "user", "content": navigation_task})

            async for result in agent.run(history, stream=False):
                history += result.get("output", [])

            print(f"✅ Navigation completed\n")

            # Extract 20 contacts
            contacts_extracted = 0
            linkedin_urls = []
            previous_contact_name = None

            for contact_num in range(1, 21):
                # Build extraction task
                if contact_num == 1:
                    extraction_task = (
                        f"STEP {contact_num + 1} - EXTRACT CONTACT {contact_num} OF 20:\n"
                        f"1. Click on the first connection's profile\n"
                        f"2. Extract: FIRST, LAST, ROLE, COMPANY, LINKEDIN URL\n"
                        f"3. Return in exact format:\n"
                        f"FIRST: [value]\n"
                        f"LAST: [value]\n"
                        f"ROLE: [value]\n"
                        f"COMPANY: [value]\n"
                        f"LINKEDIN: [value]\n"
                        f"4. Navigate back to connections list"
                    )
                else:
                    extraction_task = (
                        f"STEP {contact_num + 1} - EXTRACT CONTACT {contact_num} OF 20:\n"
                        f"1. Find '{previous_contact_name}' in the list\n"
                        f"2. Click on the contact BELOW them\n"
                        f"3. Extract: FIRST, LAST, ROLE, COMPANY, LINKEDIN URL\n"
                        f"4. Return in exact format:\n"
                        f"FIRST: [value]\n"
                        f"LAST: [value]\n"
                        f"ROLE: [value]\n"
                        f"COMPANY: [value]\n"
                        f"LINKEDIN: [value]\n"
                        f"5. Navigate back"
                    )

                print(f"[Task {contact_num + 1}/21] Extracting contact {contact_num}/20...")
                history.append({"role": "user", "content": extraction_task})

                all_output = []
                async for result in agent.run(history, stream=False):
                    output = result.get("output", [])
                    history += output
                    all_output.extend(output)

                contact_data = extract_contact_from_response(all_output)

                has_name = bool(contact_data['first'] and contact_data['last'])
                has_linkedin = bool(contact_data['linkedin'] and 'linkedin.com' in contact_data['linkedin'])

                if has_name or has_linkedin:
                    with open(csv_path, 'a', newline='', encoding='utf-8') as csvfile:
                        writer = csv.DictWriter(csvfile, fieldnames=['first', 'last', 'role', 'company', 'met_at', 'linkedin'])
                        writer.writerow(contact_data)
                    contacts_extracted += 1

                    if contact_data['linkedin']:
                        linkedin_urls.append(contact_data['linkedin'])

                    if has_name:
                        previous_contact_name = f"{contact_data['first']} {contact_data['last']}".strip()

                    name_str = f"{contact_data['first']} {contact_data['last']}" if has_name else "[No name]"
                    print(f"✅ Contact {contact_num}/20 saved: {name_str}")
                else:
                    print(f"⚠️  Could not extract valid data for contact {contact_num}")

                if contact_num % 5 == 0:
                    print(f"\n📈 Progress: {contacts_extracted}/{contact_num} contacts extracted\n")

            # Create messaging links file
            messaging_filename = f"linkedin_messaging_links_{timestamp}.txt"
            messaging_path = os.path.join(os.getcwd(), messaging_filename)

            with open(messaging_path, 'w', encoding='utf-8') as txtfile:
                txtfile.write("LinkedIn Messaging Compose Links\n")
                txtfile.write("=" * 80 + "\n\n")

                for i, linkedin_url in enumerate(linkedin_urls, 1):
                    public_id = extract_public_id_from_linkedin_url(linkedin_url)
                    if public_id:
                        messaging_url = f"https://www.linkedin.com/messaging/compose/?recipient={public_id}"
                        txtfile.write(f"{i}. {messaging_url}\n")

            print("\n" + "="*80)
            print("🎉 All tasks completed!")
            print(f"📁 CSV file saved to: {csv_path}")
            print(f"📊 Total contacts extracted: {contacts_extracted}/20")
            print(f"💬 Messaging links saved to: {messaging_path}")
            print("="*80)

    except Exception as e:
        print(f"\n❌ Error: {e}")
        traceback.print_exc()
        raise

def main():
    try:
        load_dotenv()

        if "ANTHROPIC_API_KEY" not in os.environ:
            raise RuntimeError("Please set ANTHROPIC_API_KEY in .env")

        if "CUA_API_KEY" not in os.environ:
            raise RuntimeError("Please set CUA_API_KEY in .env")

        if "CUA_CONTAINER_NAME" not in os.environ:
            raise RuntimeError("Please set CUA_CONTAINER_NAME in .env")

        signal.signal(signal.SIGINT, handle_sigint)

        asyncio.run(scrape_linkedin_connections())

    except Exception as e:
        print(f"\n❌ Error: {e}")
        traceback.print_exc()

if __name__ == "__main__":
    main()
```

  </Tab>
  <Tab value="Linux on Docker">

```python
# Same code as Cloud Sandbox, but change Computer initialization to:
async with Computer(
    os_type="linux",
    provider_type=VMProviderType.DOCKER,
    image="trycua/cua-xfce:latest",
    verbosity=logging.INFO,
) as computer:
```

And remove the `CUA_API_KEY` and `CUA_CONTAINER_NAME` requirements from `.env` and the validation checks.

  </Tab>
  <Tab value="macOS Sandbox">

```python
# Same code as Cloud Sandbox, but change Computer initialization to:
async with Computer(
    os_type="macos",
    provider_type=VMProviderType.LUME,
    name="macos-sequoia-cua:latest",
    verbosity=logging.INFO,
) as computer:
```

And remove the `CUA_API_KEY` and `CUA_CONTAINER_NAME` requirements from `.env` and the validation checks.

  </Tab>
  <Tab value="Windows Sandbox">

```python
# Same code as Cloud Sandbox, but change Computer initialization to:
async with Computer(
    os_type="windows",
    provider_type=VMProviderType.WINDOWS_SANDBOX,
    verbosity=logging.INFO,
) as computer:
```

And remove the `CUA_API_KEY` and `CUA_CONTAINER_NAME` requirements from `.env` and the validation checks.

  </Tab>
</Tabs>

</Step>

<Step>

### Run Your Script

Execute your contact extraction automation:

```bash
python contact_export.py
```

The agent will:

1. Navigate to your LinkedIn connections page
2. Extract data from 20 contacts (first name, last name, role, company, LinkedIn URL)
3. Save contacts to a timestamped CSV file
4. Generate messaging compose links for easy follow-up

Monitor the output to see the agent's progress. The script will show a progress update every 5 contacts.

</Step>

</Steps>

---

## How It Works

This script demonstrates a practical workflow for extracting LinkedIn connection data:

1. **Session Persistence** - Manually log into LinkedIn through the VM once, and the VM saves your session
2. **Navigation** - The script navigates to your connections page using your saved authenticated session
3. **Data Extraction** - For each contact, the agent clicks their profile, extracts data, and navigates back
4. **Python Processing** - Python parses responses, validates data, and writes to CSV incrementally
5. **Output Files** - Generates a CSV with contact data and a text file with messaging URLs

## Next Steps

- Learn more about [Cua computers](/computer-sdk/computers) and [computer commands](/computer-sdk/commands)
- Read about [Agent loops](/agent-sdk/agent-loops), [tools](/agent-sdk/custom-tools), and [supported model providers](/agent-sdk/supported-model-providers/)
- Experiment with different [Models and Providers](/agent-sdk/supported-model-providers/)
- Adapt this script for other platforms (Twitter/X, email extraction, etc.)
- Join our [Discord community](https://discord.com/invite/mVnXXpdE85) for help

```

--------------------------------------------------------------------------------
/libs/python/mcp-server/mcp_server/server.py:
--------------------------------------------------------------------------------

```python
import asyncio
import base64
import inspect
import logging
import os
import signal
import sys
import traceback
import uuid
from typing import Any, Dict, List, Optional, Tuple, Union

import anyio

# Configure logging to output to stderr for debug visibility
logging.basicConfig(
    level=logging.DEBUG,  # Changed to DEBUG
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    stream=sys.stderr,
)
logger = logging.getLogger("mcp-server")

# More visible startup message
logger.debug("MCP Server module loading...")

try:
    from mcp.server.fastmcp import Context, FastMCP

    # Use the canonical Image type
    from mcp.server.fastmcp.utilities.types import Image

    logger.debug("Successfully imported FastMCP")
except ImportError as e:
    logger.error(f"Failed to import FastMCP: {e}")
    traceback.print_exc(file=sys.stderr)
    sys.exit(1)

try:
    from agent import ComputerAgent
    from computer import Computer

    logger.debug("Successfully imported Computer and Agent modules")
except ImportError as e:
    logger.error(f"Failed to import Computer/Agent modules: {e}")
    traceback.print_exc(file=sys.stderr)
    sys.exit(1)

try:
    from .session_manager import (
        get_session_manager,
        initialize_session_manager,
        shutdown_session_manager,
    )

    logger.debug("Successfully imported session manager")
except ImportError as e:
    logger.error(f"Failed to import session manager: {e}")
    traceback.print_exc(file=sys.stderr)
    sys.exit(1)


def get_env_bool(key: str, default: bool = False) -> bool:
    """Get boolean value from environment variable."""
    return os.getenv(key, str(default)).lower() in ("true", "1", "yes")


async def _maybe_call_ctx_method(ctx: Context, method_name: str, *args, **kwargs) -> None:
    """Call a context helper if it exists, awaiting the result when necessary."""
    method = getattr(ctx, method_name, None)
    if not callable(method):
        return
    result = method(*args, **kwargs)
    if inspect.isawaitable(result):
        await result


def _normalise_message_content(content: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
    """Normalise message content to a list of structured parts."""
    if isinstance(content, list):
        return content
    if content is None:
        return []
    return [{"type": "output_text", "text": str(content)}]


def _extract_text_from_content(content: Union[str, List[Dict[str, Any]]]) -> str:
    """Extract textual content for inclusion in the aggregated result string."""
    if isinstance(content, str):
        return content
    texts: List[str] = []
    for part in content or []:
        if not isinstance(part, dict):
            continue
        if part.get("type") in {"output_text", "text"} and part.get("text"):
            texts.append(str(part["text"]))
    return "\n".join(texts)


def _serialise_tool_content(content: Any) -> str:
    """Convert tool outputs into a string for aggregation."""
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        texts: List[str] = []
        for part in content:
            if (
                isinstance(part, dict)
                and part.get("type") in {"output_text", "text"}
                and part.get("text")
            ):
                texts.append(str(part["text"]))
        if texts:
            return "\n".join(texts)
    if content is None:
        return ""
    return str(content)


def serve() -> FastMCP:
    """Create and configure the MCP server."""
    # NOTE: Do not pass model_config here; FastMCP 2.12.x doesn't support it.
    server = FastMCP(name="cua-agent")

    @server.tool(structured_output=False)
    async def screenshot_cua(ctx: Context, session_id: Optional[str] = None) -> Any:
        """
        Take a screenshot of the current MacOS VM screen and return the image.

        Args:
            session_id: Optional session ID for multi-client support. If not provided, a new session will be created.
        """
        session_manager = get_session_manager()

        async with session_manager.get_session(session_id) as session:
            screenshot = await session.computer.interface.screenshot()
            # Returning Image object is fine when structured_output=False
            return Image(format="png", data=screenshot)

    @server.tool(structured_output=False)
    async def run_cua_task(ctx: Context, task: str, session_id: Optional[str] = None) -> Any:
        """
        Run a Computer-Use Agent (CUA) task in a MacOS VM and return (combined text, final screenshot).

        Args:
            task: The task description for the agent to execute
            session_id: Optional session ID for multi-client support. If not provided, a new session will be created.
        """
        session_manager = get_session_manager()
        task_id = str(uuid.uuid4())

        try:
            logger.info(f"Starting CUA task: {task} (task_id: {task_id})")

            async with session_manager.get_session(session_id) as session:
                # Register this task with the session
                await session_manager.register_task(session.session_id, task_id)

                try:
                    # Get model name
                    model_name = os.getenv("CUA_MODEL_NAME", "anthropic/claude-sonnet-4-5-20250929")
                    logger.info(f"Using model: {model_name}")

                    # Create agent with the new v0.4.x API
                    agent = ComputerAgent(
                        model=model_name,
                        only_n_most_recent_images=int(os.getenv("CUA_MAX_IMAGES", "3")),
                        verbosity=logging.INFO,
                        tools=[session.computer],
                    )

                    messages = [{"role": "user", "content": task}]

                    # Collect all results
                    aggregated_messages: List[str] = []
                    async for result in agent.run(messages):
                        logger.info("Agent processing step")
                        ctx.info("Agent processing step")

                        outputs = result.get("output", [])
                        for output in outputs:
                            output_type = output.get("type")

                            if output_type == "message":
                                logger.debug("Streaming assistant message: %s", output)
                                content = _normalise_message_content(output.get("content"))
                                aggregated_text = _extract_text_from_content(content)
                                if aggregated_text:
                                    aggregated_messages.append(aggregated_text)
                                await _maybe_call_ctx_method(
                                    ctx,
                                    "yield_message",
                                    role=output.get("role", "assistant"),
                                    content=content,
                                )

                            elif output_type in {"tool_use", "computer_call", "function_call"}:
                                logger.debug("Streaming tool call: %s", output)
                                call_id = output.get("id") or output.get("call_id")
                                tool_name = output.get("name") or output.get("action", {}).get(
                                    "type"
                                )
                                tool_input = (
                                    output.get("input")
                                    or output.get("arguments")
                                    or output.get("action")
                                )
                                if call_id:
                                    await _maybe_call_ctx_method(
                                        ctx,
                                        "yield_tool_call",
                                        name=tool_name,
                                        call_id=call_id,
                                        input=tool_input,
                                    )

                            elif output_type in {
                                "tool_result",
                                "computer_call_output",
                                "function_call_output",
                            }:
                                logger.debug("Streaming tool output: %s", output)
                                call_id = output.get("call_id") or output.get("id")
                                content = output.get("content") or output.get("output")
                                aggregated_text = _serialise_tool_content(content)
                                if aggregated_text:
                                    aggregated_messages.append(aggregated_text)
                                if call_id:
                                    await _maybe_call_ctx_method(
                                        ctx,
                                        "yield_tool_output",
                                        call_id=call_id,
                                        output=content,
                                        is_error=output.get("status") == "failed"
                                        or output.get("is_error", False),
                                    )

                    logger.info("CUA task completed successfully")
                    ctx.info("CUA task completed successfully")

                    screenshot_image = Image(
                        format="png",
                        data=await session.computer.interface.screenshot(),
                    )

                    return (
                        "\n".join(aggregated_messages).strip()
                        or "Task completed with no text output.",
                        screenshot_image,
                    )

                finally:
                    # Unregister the task from the session
                    await session_manager.unregister_task(session.session_id, task_id)

        except Exception as e:
            error_msg = f"Error running CUA task: {str(e)}\n{traceback.format_exc()}"
            logger.error(error_msg)
            ctx.error(error_msg)

            # Try to get a screenshot from the session if available
            try:
                if session_id:
                    async with session_manager.get_session(session_id) as session:
                        screenshot = await session.computer.interface.screenshot()
                        return (
                            f"Error during task execution: {str(e)}",
                            Image(format="png", data=screenshot),
                        )
            except Exception:
                pass

            # If we can't get a screenshot, return a placeholder
            return (
                f"Error during task execution: {str(e)}",
                Image(format="png", data=b""),
            )

    @server.tool(structured_output=False)
    async def run_multi_cua_tasks(
        ctx: Context, tasks: List[str], session_id: Optional[str] = None, concurrent: bool = False
    ) -> Any:
        """
        Run multiple CUA tasks and return a list of (combined text, screenshot).

        Args:
            tasks: List of task descriptions to execute
            session_id: Optional session ID for multi-client support. If not provided, a new session will be created.
            concurrent: If True, run tasks concurrently. If False, run sequentially (default).
        """
        total_tasks = len(tasks)
        if total_tasks == 0:
            ctx.report_progress(1.0)
            return []

        session_manager = get_session_manager()

        if concurrent and total_tasks > 1:
            # Run tasks concurrently
            logger.info(f"Running {total_tasks} tasks concurrently")
            ctx.info(f"Running {total_tasks} tasks concurrently")

            # Create tasks with progress tracking
            async def run_task_with_progress(
                task_index: int, task: str
            ) -> Tuple[int, Tuple[str, Image]]:
                ctx.report_progress(task_index / total_tasks)
                result = await run_cua_task(ctx, task, session_id)
                ctx.report_progress((task_index + 1) / total_tasks)
                return task_index, result

            # Create all task coroutines
            task_coroutines = [run_task_with_progress(i, task) for i, task in enumerate(tasks)]

            # Wait for all tasks to complete
            results_with_indices = await asyncio.gather(*task_coroutines, return_exceptions=True)

            # Sort results by original task order and handle exceptions
            results: List[Tuple[str, Image]] = []
            for result in results_with_indices:
                if isinstance(result, Exception):
                    logger.error(f"Task failed with exception: {result}")
                    ctx.error(f"Task failed: {str(result)}")
                    results.append((f"Task failed: {str(result)}", Image(format="png", data=b"")))
                else:
                    _, task_result = result
                    results.append(task_result)

            return results
        else:
            # Run tasks sequentially (original behavior)
            logger.info(f"Running {total_tasks} tasks sequentially")
            ctx.info(f"Running {total_tasks} tasks sequentially")

            results: List[Tuple[str, Image]] = []
            for i, task in enumerate(tasks):
                logger.info(f"Running task {i+1}/{total_tasks}: {task}")
                ctx.info(f"Running task {i+1}/{total_tasks}: {task}")

                ctx.report_progress(i / total_tasks)
                task_result = await run_cua_task(ctx, task, session_id)
                results.append(task_result)
                ctx.report_progress((i + 1) / total_tasks)

            return results

    @server.tool(structured_output=False)
    async def get_session_stats(ctx: Context) -> Dict[str, Any]:
        """
        Get statistics about active sessions and resource usage.
        """
        session_manager = get_session_manager()
        return session_manager.get_session_stats()

    @server.tool(structured_output=False)
    async def cleanup_session(ctx: Context, session_id: str) -> str:
        """
        Cleanup a specific session and release its resources.

        Args:
            session_id: The session ID to cleanup
        """
        session_manager = get_session_manager()
        await session_manager.cleanup_session(session_id)
        return f"Session {session_id} cleanup initiated"

    return server


server = serve()


async def run_server():
    """Run the MCP server with proper lifecycle management."""
    session_manager = None
    try:
        logger.debug("Starting MCP server...")

        # Initialize session manager
        session_manager = await initialize_session_manager()
        logger.info("Session manager initialized")

        # Set up signal handlers for graceful shutdown
        def signal_handler(signum, frame):
            logger.info(f"Received signal {signum}, initiating graceful shutdown...")
            # Create a task to shutdown gracefully
            asyncio.create_task(graceful_shutdown())

        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)

        # Start the server
        logger.info("Starting FastMCP server...")
        # Use run_stdio_async directly instead of server.run() to avoid nested event loops
        await server.run_stdio_async()

    except Exception as e:
        logger.error(f"Error starting server: {e}")
        traceback.print_exc(file=sys.stderr)
        raise
    finally:
        # Ensure cleanup happens
        if session_manager:
            logger.info("Shutting down session manager...")
            await shutdown_session_manager()


async def graceful_shutdown():
    """Gracefully shutdown the server and all sessions."""
    logger.info("Initiating graceful shutdown...")
    try:
        await shutdown_session_manager()
        logger.info("Graceful shutdown completed")
    except Exception as e:
        logger.error(f"Error during graceful shutdown: {e}")
    finally:
        # Exit the process
        import os

        os._exit(0)


def main():
    """Run the MCP server with proper async lifecycle management."""
    try:
        # Use anyio.run instead of asyncio.run to avoid nested event loop issues
        anyio.run(run_server)
    except KeyboardInterrupt:
        logger.info("Server interrupted by user")
    except Exception as e:
        logger.error(f"Error starting server: {e}")
        traceback.print_exc(file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/libs/lume/src/Commands/Logs.swift:
--------------------------------------------------------------------------------

```swift
import ArgumentParser
import Foundation

struct Logs: ParsableCommand {
    static let configuration = CommandConfiguration(
        abstract: "View lume serve logs",
        subcommands: [Info.self, Error.self, All.self],
        defaultSubcommand: All.self
    )
    
    // Common functionality for reading log files
    static func readLogFile(path: String, lines: Int? = nil, follow: Bool = false) -> String {
        let fileManager = FileManager.default
        
        // Check if file exists
        guard fileManager.fileExists(atPath: path) else {
            return "Log file not found at \(path)"
        }
        
        do {
            // Read file content
            let content = try String(contentsOfFile: path, encoding: .utf8)
            
            // If lines parameter is provided, return only the specified number of lines from the end
            if let lineCount = lines {
                let allLines = content.components(separatedBy: .newlines)
                let startIndex = max(0, allLines.count - lineCount)
                let lastLines = Array(allLines[startIndex...])
                return lastLines.joined(separator: "\n")
            }
            
            return content
        } catch {
            return "Error reading log file: \(error.localizedDescription)"
        }
    }
    
    // Method for tailing a log file (following new changes)
    static func tailLogFile(path: String, initialLines: Int? = 10) {
        let fileManager = FileManager.default
        
        // Check if file exists
        guard fileManager.fileExists(atPath: path) else {
            print("Log file not found at \(path)")
            return
        }
        
        do {
            // Get initial content with only the specified number of lines from the end
            var lastPosition: UInt64 = 0
            let fileHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: path))
            
            // First, print the last few lines of the file
            if let lines = initialLines {
                let content = try String(contentsOfFile: path, encoding: .utf8)
                let allLines = content.components(separatedBy: .newlines)
                let startIndex = max(0, allLines.count - lines)
                let lastLines = Array(allLines[startIndex...])
                print(lastLines.joined(separator: "\n"))
            }
            
            // Get current file size
            lastPosition = UInt64(try fileManager.attributesOfItem(atPath: path)[.size] as? UInt64 ?? 0)
            
            // Set up for continuous monitoring
            print("\nTailing log file... Press Ctrl+C to stop")
            
            // Monitor file for changes
            while true {
                // Brief pause to reduce CPU usage
                Thread.sleep(forTimeInterval: 0.5)
                
                // Get current size
                let currentSize = try fileManager.attributesOfItem(atPath: path)[.size] as? UInt64 ?? 0
                
                // If file has grown
                if currentSize > lastPosition {
                    // Seek to where we last read
                    fileHandle.seek(toFileOffset: lastPosition)
                    
                    // Read new content
                    if let newData = try? fileHandle.readToEnd() {
                        if let newContent = String(data: newData, encoding: .utf8) {
                            // Print new content without trailing newline
                            if newContent.hasSuffix("\n") {
                                print(newContent, terminator: "")
                            } else {
                                print(newContent)
                            }
                        }
                    }
                    
                    // Update position
                    lastPosition = currentSize
                }
                
                // Handle file rotation (if file became smaller)
                else if currentSize < lastPosition {
                    // File was probably rotated, start from beginning
                    lastPosition = 0
                    fileHandle.seek(toFileOffset: 0)
                    
                    if let newData = try? fileHandle.readToEnd() {
                        if let newContent = String(data: newData, encoding: .utf8) {
                            print(newContent, terminator: "")
                        }
                    }
                    
                    lastPosition = currentSize
                }
            }
        } catch {
            print("Error tailing log file: \(error.localizedDescription)")
        }
    }
    
    // MARK: - Info Logs Subcommand
    
    struct Info: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "info",
            abstract: "View info logs from the daemon"
        )
        
        @Option(name: .shortAndLong, help: "Number of lines to display from the end of the file")
        var lines: Int?
        
        @Flag(name: .shortAndLong, help: "Follow log file continuously (like tail -f)")
        var follow: Bool = false
        
        func run() throws {
            let logPath = "/tmp/lume_daemon.log"
            
            print("=== Info Logs ===")
            
            if follow {
                // Use tailing functionality to continuously monitor the log
                Logs.tailLogFile(path: logPath, initialLines: lines ?? 10)
            } else {
                // Regular one-time viewing of logs
                let content = Logs.readLogFile(path: logPath, lines: lines)
                print(content)
            }
        }
    }
    
    // MARK: - Error Logs Subcommand
    
    struct Error: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "error",
            abstract: "View error logs from the daemon"
        )
        
        @Option(name: .shortAndLong, help: "Number of lines to display from the end of the file")
        var lines: Int?
        
        @Flag(name: .shortAndLong, help: "Follow log file continuously (like tail -f)")
        var follow: Bool = false
        
        func run() throws {
            let logPath = "/tmp/lume_daemon.error.log"
            
            print("=== Error Logs ===")
            
            if follow {
                // Use tailing functionality to continuously monitor the log
                Logs.tailLogFile(path: logPath, initialLines: lines ?? 10)
            } else {
                // Regular one-time viewing of logs
                let content = Logs.readLogFile(path: logPath, lines: lines)
                print(content)
            }
        }
    }
    
    // MARK: - All Logs Subcommand
    
    struct All: ParsableCommand {
        static let configuration = CommandConfiguration(
            commandName: "all",
            abstract: "View both info and error logs from the daemon"
        )
        
        @Option(name: .shortAndLong, help: "Number of lines to display from the end of each file")
        var lines: Int?
        
        @Flag(name: .shortAndLong, help: "Follow log files continuously (like tail -f)")
        var follow: Bool = false
        
        // Custom implementation to tail both logs simultaneously
        private func tailBothLogs(infoPath: String, errorPath: String, initialLines: Int? = 10) {
            let fileManager = FileManager.default
            var infoExists = fileManager.fileExists(atPath: infoPath)
            var errorExists = fileManager.fileExists(atPath: errorPath)
            
            if !infoExists && !errorExists {
                print("Neither info nor error log files found")
                return
            }
            
            // Print initial content
            print("=== Info Logs ===")
            if infoExists {
                if let lines = initialLines {
                    let content = (try? String(contentsOfFile: infoPath, encoding: .utf8)) ?? ""
                    let allLines = content.components(separatedBy: .newlines)
                    let startIndex = max(0, allLines.count - lines)
                    let lastLines = Array(allLines[startIndex...])
                    print(lastLines.joined(separator: "\n"))
                }
            } else {
                print("Info log file not found")
            }
            
            print("\n=== Error Logs ===")
            if errorExists {
                if let lines = initialLines {
                    let content = (try? String(contentsOfFile: errorPath, encoding: .utf8)) ?? ""
                    let allLines = content.components(separatedBy: .newlines)
                    let startIndex = max(0, allLines.count - lines)
                    let lastLines = Array(allLines[startIndex...])
                    print(lastLines.joined(separator: "\n"))
                }
            } else {
                print("Error log file not found")
            }
            
            print("\nTailing both log files... Press Ctrl+C to stop")
            
            // Initialize file handles and positions
            var infoHandle: FileHandle? = nil
            var errorHandle: FileHandle? = nil
            var infoPosition: UInt64 = 0
            var errorPosition: UInt64 = 0
            
            // Set up file handles
            if infoExists {
                do {
                    infoHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: infoPath))
                    infoPosition = UInt64(try fileManager.attributesOfItem(atPath: infoPath)[.size] as? UInt64 ?? 0)
                } catch {
                    print("Error opening info log file: \(error.localizedDescription)")
                }
            }
            
            if errorExists {
                do {
                    errorHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: errorPath))
                    errorPosition = UInt64(try fileManager.attributesOfItem(atPath: errorPath)[.size] as? UInt64 ?? 0)
                } catch {
                    print("Error opening error log file: \(error.localizedDescription)")
                }
            }
            
            // Monitor both files for changes
            while true {
                Thread.sleep(forTimeInterval: 0.5)
                
                // Check for new content in info log
                if let handle = infoHandle {
                    do {
                        // Re-check existence in case file was deleted
                        infoExists = fileManager.fileExists(atPath: infoPath)
                        if !infoExists {
                            print("\n[Info log file was removed]")
                            infoHandle = nil
                            continue
                        }
                        
                        let currentSize = try fileManager.attributesOfItem(atPath: infoPath)[.size] as? UInt64 ?? 0
                        
                        if currentSize > infoPosition {
                            handle.seek(toFileOffset: infoPosition)
                            if let newData = try? handle.readToEnd() {
                                if let newContent = String(data: newData, encoding: .utf8) {
                                    print("\n--- New Info Log Content ---")
                                    if newContent.hasSuffix("\n") {
                                        print(newContent, terminator: "")
                                    } else {
                                        print(newContent)
                                    }
                                }
                            }
                            infoPosition = currentSize
                        } else if currentSize < infoPosition {
                            // File was rotated
                            print("\n[Info log was rotated]")
                            infoPosition = 0
                            handle.seek(toFileOffset: 0)
                            if let newData = try? handle.readToEnd() {
                                if let newContent = String(data: newData, encoding: .utf8) {
                                    print("\n--- New Info Log Content ---")
                                    print(newContent, terminator: "")
                                }
                            }
                            infoPosition = currentSize
                        }
                    } catch {
                        print("\nError reading info log: \(error.localizedDescription)")
                    }
                } else if fileManager.fileExists(atPath: infoPath) && !infoExists {
                    // File exists again after being deleted
                    do {
                        infoHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: infoPath))
                        infoPosition = 0
                        infoExists = true
                        print("\n[Info log file reappeared]")
                    } catch {
                        print("\nError reopening info log: \(error.localizedDescription)")
                    }
                }
                
                // Check for new content in error log
                if let handle = errorHandle {
                    do {
                        // Re-check existence in case file was deleted
                        errorExists = fileManager.fileExists(atPath: errorPath)
                        if !errorExists {
                            print("\n[Error log file was removed]")
                            errorHandle = nil
                            continue
                        }
                        
                        let currentSize = try fileManager.attributesOfItem(atPath: errorPath)[.size] as? UInt64 ?? 0
                        
                        if currentSize > errorPosition {
                            handle.seek(toFileOffset: errorPosition)
                            if let newData = try? handle.readToEnd() {
                                if let newContent = String(data: newData, encoding: .utf8) {
                                    print("\n--- New Error Log Content ---")
                                    if newContent.hasSuffix("\n") {
                                        print(newContent, terminator: "")
                                    } else {
                                        print(newContent)
                                    }
                                }
                            }
                            errorPosition = currentSize
                        } else if currentSize < errorPosition {
                            // File was rotated
                            print("\n[Error log was rotated]")
                            errorPosition = 0
                            handle.seek(toFileOffset: 0)
                            if let newData = try? handle.readToEnd() {
                                if let newContent = String(data: newData, encoding: .utf8) {
                                    print("\n--- New Error Log Content ---")
                                    print(newContent, terminator: "")
                                }
                            }
                            errorPosition = currentSize
                        }
                    } catch {
                        print("\nError reading error log: \(error.localizedDescription)")
                    }
                } else if fileManager.fileExists(atPath: errorPath) && !errorExists {
                    // File exists again after being deleted
                    do {
                        errorHandle = try FileHandle(forReadingFrom: URL(fileURLWithPath: errorPath))
                        errorPosition = 0
                        errorExists = true
                        print("\n[Error log file reappeared]")
                    } catch {
                        print("\nError reopening error log: \(error.localizedDescription)")
                    }
                }
            }
        }
        
        func run() throws {
            let infoLogPath = "/tmp/lume_daemon.log"
            let errorLogPath = "/tmp/lume_daemon.error.log"
            
            if follow {
                // Use custom tailing implementation for both logs
                tailBothLogs(infoPath: infoLogPath, errorPath: errorLogPath, initialLines: lines ?? 10)
            } else {
                // Regular one-time viewing of logs
                let infoContent = Logs.readLogFile(path: infoLogPath, lines: lines)
                let errorContent = Logs.readLogFile(path: errorLogPath, lines: lines)
                
                print("=== Info Logs ===")
                print(infoContent)
                print("\n=== Error Logs ===")
                print(errorContent)
            }
        }
    }
}

```

--------------------------------------------------------------------------------
/examples/som_examples.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Example script demonstrating the usage of OmniParser's UI element detection functionality.
This script shows how to:
1. Initialize the OmniParser
2. Load and process images
3. Visualize detection results
4. Compare performance between CPU and MPS (Apple Silicon)
"""

import argparse
import base64
import glob
import io
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional

import numpy as np
from PIL import Image

# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv

load_dotenv(env_file)

# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
    if path and path not in sys.path:
        sys.path.append(path)
        print(f"Added to sys.path: {path}")

# Add the libs directory to the path to find som
libs_path = project_root / "libs"
if str(libs_path) not in sys.path:
    sys.path.append(str(libs_path))
    print(f"Added to sys.path: {libs_path}")

from som import IconElement, OmniParser, ParseResult, TextElement
from som.models import BoundingBox, ParserMetadata, UIElement

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)


def setup_logging():
    """Configure logging with a nice format."""
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )


class Timer:
    """Enhanced context manager for timing code blocks."""

    def __init__(self, name: str, logger):
        self.name = name
        self.logger = logger
        self.start_time: float = 0.0
        self.elapsed_time: float = 0.0

    def __enter__(self):
        self.start_time = time.time()
        return self

    def __exit__(self, *args):
        self.elapsed_time = time.time() - self.start_time
        self.logger.info(f"{self.name}: {self.elapsed_time:.3f}s")
        return False


def image_to_bytes(image: Image.Image) -> bytes:
    """Convert PIL Image to PNG bytes."""
    buf = io.BytesIO()
    image.save(buf, format="PNG")
    return buf.getvalue()


def process_image(
    parser: OmniParser, image_path: str, output_dir: Path, use_ocr: bool = False
) -> None:
    """Process a single image and save the result."""
    try:
        # Load image
        logger.info(f"Processing image: {image_path}")
        image = Image.open(image_path).convert("RGB")
        logger.info(f"Image loaded successfully, size: {image.size}")

        # Create output filename
        input_filename = Path(image_path).stem
        output_path = output_dir / f"{input_filename}_analyzed.png"

        # Convert image to PNG bytes
        image_bytes = image_to_bytes(image)

        # Process image
        with Timer(f"Processing {input_filename}", logger):
            result = parser.parse(image_bytes, use_ocr=use_ocr)
            logger.info(
                f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements"
            )

            # Save the annotated image
            logger.info(f"Saving annotated image to: {output_path}")
            try:
                # Save image from base64
                img_data = base64.b64decode(result.annotated_image_base64)
                img = Image.open(io.BytesIO(img_data))
                img.save(output_path)

                # Print detailed results
                logger.info("\nDetected Elements:")
                for elem in result.elements:
                    if isinstance(elem, IconElement):
                        logger.info(
                            f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                        )
                    elif isinstance(elem, TextElement):
                        logger.info(
                            f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                        )

                # Verify file exists and log size
                if output_path.exists():
                    logger.info(
                        f"Successfully saved image. File size: {output_path.stat().st_size} bytes"
                    )
                else:
                    logger.error(f"Failed to verify file at {output_path}")
            except Exception as e:
                logger.error(f"Error saving image: {str(e)}", exc_info=True)

    except Exception as e:
        logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True)


def run_detection_benchmark(
    input_path: str,
    output_dir: Path,
    use_ocr: bool = False,
    box_threshold: float = 0.01,
    iou_threshold: float = 0.1,
):
    """Run detection benchmark on images."""
    logger.info(
        f"Starting benchmark with OCR enabled: {use_ocr}, box_threshold: {box_threshold}, iou_threshold: {iou_threshold}"
    )

    try:
        # Initialize parser
        logger.info("Initializing OmniParser...")
        parser = OmniParser()

        # Create output directory
        output_dir.mkdir(parents=True, exist_ok=True)
        logger.info(f"Output directory created at: {output_dir}")

        # Get list of PNG files
        if os.path.isdir(input_path):
            image_files = glob.glob(os.path.join(input_path, "*.png"))
        else:
            image_files = [input_path]

        logger.info(f"Found {len(image_files)} images to process")

        # Process each image with specified thresholds
        for image_path in image_files:
            try:
                # Load image
                logger.info(f"Processing image: {image_path}")
                image = Image.open(image_path).convert("RGB")
                logger.info(f"Image loaded successfully, size: {image.size}")

                # Create output filename
                input_filename = Path(image_path).stem
                output_path = output_dir / f"{input_filename}_analyzed.png"

                # Convert image to PNG bytes
                image_bytes = image_to_bytes(image)

                # Process image with specified thresholds
                with Timer(f"Processing {input_filename}", logger):
                    result = parser.parse(
                        image_bytes,
                        use_ocr=use_ocr,
                        box_threshold=box_threshold,
                        iou_threshold=iou_threshold,
                    )
                    logger.info(
                        f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements"
                    )

                    # Save the annotated image
                    logger.info(f"Saving annotated image to: {output_path}")
                    try:
                        # Save image from base64
                        img_data = base64.b64decode(result.annotated_image_base64)
                        img = Image.open(io.BytesIO(img_data))
                        img.save(output_path)

                        # Print detailed results
                        logger.info("\nDetected Elements:")
                        for elem in result.elements:
                            if isinstance(elem, IconElement):
                                logger.info(
                                    f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                                )
                            elif isinstance(elem, TextElement):
                                logger.info(
                                    f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                                )

                        # Verify file exists and log size
                        if output_path.exists():
                            logger.info(
                                f"Successfully saved image. File size: {output_path.stat().st_size} bytes"
                            )
                        else:
                            logger.error(f"Failed to verify file at {output_path}")
                    except Exception as e:
                        logger.error(f"Error saving image: {str(e)}", exc_info=True)

            except Exception as e:
                logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True)

    except Exception as e:
        logger.error(f"Benchmark failed: {str(e)}", exc_info=True)
        raise


def run_experiments(input_path: str, output_dir: Path, use_ocr: bool = False):
    """Run experiments with different threshold combinations."""
    # Define threshold values to test
    box_thresholds = [0.01, 0.05, 0.1, 0.3]
    iou_thresholds = [0.05, 0.1, 0.2, 0.5]

    logger.info("Starting threshold experiments...")
    logger.info("Box thresholds to test: %s", box_thresholds)
    logger.info("IOU thresholds to test: %s", iou_thresholds)

    # Create results directory for this experiment
    timestamp = time.strftime("%Y%m%d-%H%M%S")
    ocr_suffix = "_ocr" if use_ocr else "_no_ocr"
    exp_dir = output_dir / f"experiment_{timestamp}{ocr_suffix}"
    exp_dir.mkdir(parents=True, exist_ok=True)

    # Create a summary file
    summary_file = exp_dir / "results_summary.txt"
    with open(summary_file, "w") as f:
        f.write("Threshold Experiments Results\n")
        f.write("==========================\n\n")
        f.write(f"Input: {input_path}\n")
        f.write(f"OCR Enabled: {use_ocr}\n")
        f.write(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
        f.write("Results:\n")
        f.write("-" * 80 + "\n")
        f.write(
            f"{'Box Thresh':^10} | {'IOU Thresh':^10} | {'Num Icons':^10} | {'Num Text':^10} | {'Time (s)':^10}\n"
        )
        f.write("-" * 80 + "\n")

        # Initialize parser once for all experiments
        parser = OmniParser()

        # Run experiments with each combination
        for box_thresh in box_thresholds:
            for iou_thresh in iou_thresholds:
                logger.info(f"\nTesting box_threshold={box_thresh}, iou_threshold={iou_thresh}")

                # Create directory for this combination
                combo_dir = exp_dir / f"box_{box_thresh}_iou_{iou_thresh}"
                combo_dir.mkdir(exist_ok=True)

                try:
                    # Process each image
                    if os.path.isdir(input_path):
                        image_files = glob.glob(os.path.join(input_path, "*.png"))
                    else:
                        image_files = [input_path]

                    total_icons = 0
                    total_text = 0
                    total_time = 0

                    for image_path in image_files:
                        # Load and process image
                        image = Image.open(image_path).convert("RGB")
                        image_bytes = image_to_bytes(image)

                        # Process with current thresholds
                        with Timer(f"Processing {Path(image_path).stem}", logger) as t:
                            result = parser.parse(
                                image_bytes,
                                use_ocr=use_ocr,
                                box_threshold=box_thresh,
                                iou_threshold=iou_thresh,
                            )

                            # Save annotated image
                            output_path = combo_dir / f"{Path(image_path).stem}_analyzed.png"
                            img_data = base64.b64decode(result.annotated_image_base64)
                            img = Image.open(io.BytesIO(img_data))
                            img.save(output_path)

                            # Update totals
                            total_icons += result.metadata.num_icons
                            total_text += result.metadata.num_text

                            # Log detailed results
                            detail_file = combo_dir / f"{Path(image_path).stem}_details.txt"
                            with open(detail_file, "w") as detail_f:
                                detail_f.write(f"Results for {Path(image_path).name}\n")
                                detail_f.write("-" * 40 + "\n")
                                detail_f.write(f"Number of icons: {result.metadata.num_icons}\n")
                                detail_f.write(
                                    f"Number of text elements: {result.metadata.num_text}\n\n"
                                )

                                detail_f.write("Icon Detections:\n")
                                icon_count = 1
                                text_count = (
                                    result.metadata.num_icons + 1
                                )  # Text boxes start after icons

                                # First list all icons
                                for elem in result.elements:
                                    if isinstance(elem, IconElement):
                                        detail_f.write(f"Box #{icon_count}: Icon\n")
                                        detail_f.write(f"  - Confidence: {elem.confidence:.3f}\n")
                                        detail_f.write(
                                            f"  - Coordinates: {elem.bbox.coordinates}\n"
                                        )
                                        icon_count += 1

                                if use_ocr:
                                    detail_f.write("\nText Detections:\n")
                                    for elem in result.elements:
                                        if isinstance(elem, TextElement):
                                            detail_f.write(f"Box #{text_count}: Text\n")
                                            detail_f.write(f"  - Content: '{elem.content}'\n")
                                            detail_f.write(
                                                f"  - Confidence: {elem.confidence:.3f}\n"
                                            )
                                            detail_f.write(
                                                f"  - Coordinates: {elem.bbox.coordinates}\n"
                                            )
                                            text_count += 1

                        # Update timing totals
                        total_time += t.elapsed_time

                    # Write summary for this combination
                    avg_time = total_time / len(image_files)
                    f.write(
                        f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {total_icons:^10d} | {total_text:^10d} | {avg_time:^10.3f}\n"
                    )

                except Exception as e:
                    logger.error(
                        f"Error in experiment box={box_thresh}, iou={iou_thresh}: {str(e)}"
                    )
                    f.write(
                        f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {'ERROR':^10s} | {'ERROR':^10s} | {'ERROR':^10s}\n"
                    )

        # Write summary footer
        f.write("-" * 80 + "\n")
        f.write("\nExperiment completed successfully!\n")

    logger.info(f"\nExperiment results saved to {exp_dir}")
    logger.info(f"Summary file: {summary_file}")


def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(description="Run OmniParser benchmark")
    parser.add_argument("input_path", help="Path to input image or directory containing images")
    parser.add_argument(
        "--output-dir", default="examples/output", help="Output directory for annotated images"
    )
    parser.add_argument(
        "--ocr",
        choices=["none", "easyocr"],
        default="none",
        help="OCR engine to use (default: none)",
    )
    parser.add_argument(
        "--mode",
        choices=["single", "experiment"],
        default="single",
        help="Run mode: single run or threshold experiments (default: single)",
    )
    parser.add_argument(
        "--box-threshold",
        type=float,
        default=0.01,
        help="Confidence threshold for detection (default: 0.01)",
    )
    parser.add_argument(
        "--iou-threshold",
        type=float,
        default=0.1,
        help="IOU threshold for Non-Maximum Suppression (default: 0.1)",
    )
    args = parser.parse_args()

    logger.info(f"Starting OmniParser with arguments: {args}")
    use_ocr = args.ocr != "none"
    output_dir = Path(args.output_dir)

    try:
        if args.mode == "experiment":
            run_experiments(args.input_path, output_dir, use_ocr)
        else:
            run_detection_benchmark(
                args.input_path, output_dir, use_ocr, args.box_threshold, args.iou_threshold
            )
    except Exception as e:
        logger.error(f"Process failed: {str(e)}", exc_info=True)
        return 1

    return 0


if __name__ == "__main__":
    sys.exit(main())

```

--------------------------------------------------------------------------------
/libs/python/som/som/detect.py:
--------------------------------------------------------------------------------

```python
import argparse
import base64
import io
import logging
import signal
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast

import cv2
import numpy as np
import supervision as sv
import torch
import torchvision.ops
import torchvision.transforms as T
from huggingface_hub import hf_hub_download
from PIL import Image
from supervision.detection.core import Detections
from ultralytics import YOLO

from .detection import DetectionProcessor
from .models import (
    BoundingBox,
    IconElement,
    ParseResult,
    ParserMetadata,
    TextElement,
    UIElement,
)
from .ocr import OCRProcessor
from .visualization import BoxAnnotator

logger = logging.getLogger(__name__)


class TimeoutException(Exception):
    pass


@contextmanager
def timeout(seconds: int):
    def timeout_handler(signum, frame):
        raise TimeoutException("OCR process timed out")

    # Register the signal handler
    original_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)

    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, original_handler)


def process_text_box(box, image):
    """Process a single text box with OCR."""
    try:
        from typing import Any, List, Sequence, Tuple

        import easyocr

        x1 = int(min(point[0] for point in box))
        y1 = int(min(point[1] for point in box))
        x2 = int(max(point[0] for point in box))
        y2 = int(max(point[1] for point in box))

        # Add padding
        pad = 2
        x1 = max(0, x1 - pad)
        y1 = max(0, y1 - pad)
        x2 = min(image.shape[1], x2 + pad)
        y2 = min(image.shape[0], y2 + pad)

        region = image[y1:y2, x1:x2]
        if region.size > 0:
            reader = easyocr.Reader(["en"])
            results = reader.readtext(region)
            if results and len(results) > 0:
                # EasyOCR returns a list of tuples (bbox, text, confidence)
                first_result = results[0]
                if isinstance(first_result, (list, tuple)) and len(first_result) >= 3:
                    text = str(first_result[1])
                    confidence = float(first_result[2])
                    if confidence > 0.5:
                        return text, [x1, y1, x2, y2], confidence
    except Exception:
        pass
    return None


def check_ocr_box(image_path: Union[str, Path]) -> Tuple[List[str], List[List[float]]]:
    """Check OCR box using EasyOCR."""
    # Read image once
    if isinstance(image_path, str):
        image_path = Path(image_path)

    # Read image into memory
    image_cv = cv2.imread(str(image_path))
    if image_cv is None:
        logger.error(f"Failed to read image: {image_path}")
        return [], []

    # Get image dimensions
    img_height, img_width = image_cv.shape[:2]
    confidence_threshold = 0.5

    # Use EasyOCR
    import ssl

    import easyocr

    # Create unverified SSL context for development
    ssl._create_default_https_context = ssl._create_unverified_context
    try:
        reader = easyocr.Reader(["en"])
        with timeout(5):  # 5 second timeout for EasyOCR
            results = reader.readtext(image_cv, paragraph=False, text_threshold=0.5)
    except TimeoutException:
        logger.warning("EasyOCR timed out, returning no results")
        return [], []
    except Exception as e:
        logger.warning(f"EasyOCR failed: {str(e)}")
        return [], []
    finally:
        # Restore default SSL context
        ssl._create_default_https_context = ssl.create_default_context

    texts = []
    boxes = []

    for box, text, conf in results:
        # Convert box format to [x1, y1, x2, y2]
        x1 = min(point[0] for point in box)
        y1 = min(point[1] for point in box)
        x2 = max(point[0] for point in box)
        y2 = max(point[1] for point in box)

        if float(conf) > 0.5:  # Only keep higher confidence detections
            texts.append(text)
            boxes.append([x1, y1, x2, y2])

    return texts, boxes


class OmniParser:
    """Enhanced UI parser using computer vision and OCR for detecting interactive elements."""

    def __init__(
        self,
        model_path: Optional[Union[str, Path]] = None,
        cache_dir: Optional[Union[str, Path]] = None,
        force_device: Optional[str] = None,
    ):
        """Initialize the OmniParser.

        Args:
            model_path: Optional path to the YOLO model
            cache_dir: Optional directory to cache model files
            force_device: Force specific device (cpu/cuda/mps)
        """
        self.detector = DetectionProcessor(
            model_path=Path(model_path) if model_path else None,
            cache_dir=Path(cache_dir) if cache_dir else None,
            force_device=force_device,
        )
        self.ocr = OCRProcessor()
        self.visualizer = BoxAnnotator()

    def process_image(
        self,
        image: Image.Image,
        box_threshold: float = 0.3,
        iou_threshold: float = 0.1,
        use_ocr: bool = True,
    ) -> Tuple[Image.Image, List[UIElement]]:
        """Process an image to detect UI elements and optionally text.

        Args:
            image: Input PIL Image
            box_threshold: Confidence threshold for detection
            iou_threshold: IOU threshold for NMS
            use_ocr: Whether to enable OCR processing

        Returns:
            Tuple of (annotated image, list of detections)
        """
        try:
            logger.info("Starting UI element detection...")

            # Detect icons
            icon_detections = self.detector.detect_icons(
                image=image, box_threshold=box_threshold, iou_threshold=iou_threshold
            )
            logger.info(f"Found {len(icon_detections)} interactive elements")

            # Convert icon detections to typed objects
            elements: List[UIElement] = cast(
                List[UIElement],
                [
                    IconElement(
                        id=i + 1,
                        bbox=BoundingBox(
                            x1=det["bbox"][0],
                            y1=det["bbox"][1],
                            x2=det["bbox"][2],
                            y2=det["bbox"][3],
                        ),
                        confidence=det["confidence"],
                        scale=det.get("scale"),
                    )
                    for i, det in enumerate(icon_detections)
                ],
            )

            # Run OCR if enabled
            if use_ocr:
                logger.info("Running OCR detection...")
                text_detections = self.ocr.detect_text(image=image, confidence_threshold=0.5)
                if text_detections is None:
                    text_detections = []
                logger.info(f"Found {len(text_detections)} text regions")

                # Convert text detections to typed objects
                text_elements = cast(
                    List[UIElement],
                    [
                        TextElement(
                            id=len(elements) + i + 1,
                            bbox=BoundingBox(
                                x1=det["bbox"][0],
                                y1=det["bbox"][1],
                                x2=det["bbox"][2],
                                y2=det["bbox"][3],
                            ),
                            content=det["content"],
                            confidence=det["confidence"],
                        )
                        for i, det in enumerate(text_detections)
                    ],
                )

                if elements and text_elements:
                    # Filter out non-OCR elements that have OCR elements with center points colliding with them
                    filtered_elements = []
                    for elem in elements:  # elements at this point contains only non-OCR elements
                        should_keep = True
                        for text_elem in text_elements:
                            # Calculate center point of the text element
                            center_x = (text_elem.bbox.x1 + text_elem.bbox.x2) / 2
                            center_y = (text_elem.bbox.y1 + text_elem.bbox.y2) / 2

                            # Check if this center point is inside the non-OCR element
                            if (
                                center_x >= elem.bbox.x1
                                and center_x <= elem.bbox.x2
                                and center_y >= elem.bbox.y1
                                and center_y <= elem.bbox.y2
                            ):
                                should_keep = False
                                break

                        if should_keep:
                            filtered_elements.append(elem)
                    elements = filtered_elements

                    # Merge detections using NMS
                    all_elements = elements + text_elements
                    boxes = torch.tensor([elem.bbox.coordinates for elem in all_elements])
                    scores = torch.tensor([elem.confidence for elem in all_elements])
                    keep_indices = torchvision.ops.nms(boxes, scores, iou_threshold)
                    elements = [all_elements[i] for i in keep_indices]
                else:
                    # Just add text elements to the list if IOU doesn't need to be applied
                    elements.extend(text_elements)

            # Calculate drawing parameters based on image size
            box_overlay_ratio = max(image.size) / 3200
            draw_config = {
                "font_size": int(12 * box_overlay_ratio),
                "box_thickness": max(int(2 * box_overlay_ratio), 1),
                "text_padding": max(int(3 * box_overlay_ratio), 1),
            }

            # Convert elements back to dict format for visualization
            detection_dicts = [
                {
                    "type": elem.type,
                    "bbox": elem.bbox.coordinates,
                    "confidence": elem.confidence,
                    "content": elem.content if isinstance(elem, TextElement) else None,
                }
                for elem in elements
            ]

            # Create visualization
            logger.info("Creating visualization...")
            annotated_image = self.visualizer.draw_boxes(
                image=image.copy(), detections=detection_dicts, draw_config=draw_config
            )
            logger.info("Visualization complete")

            return annotated_image, elements

        except Exception as e:
            logger.error(f"Error in process_image: {str(e)}")
            import traceback

            logger.error(traceback.format_exc())
            raise

    def parse(
        self,
        screenshot_data: Union[bytes, str],
        box_threshold: float = 0.3,
        iou_threshold: float = 0.1,
        use_ocr: bool = True,
    ) -> ParseResult:
        """Parse a UI screenshot to detect interactive elements and text.

        Args:
            screenshot_data: Raw bytes or base64 string of the screenshot
            box_threshold: Confidence threshold for detection
            iou_threshold: IOU threshold for NMS
            use_ocr: Whether to enable OCR processing

        Returns:
            ParseResult object containing elements, annotated image, and metadata
        """
        try:
            start_time = time.time()

            # Convert input to PIL Image
            if isinstance(screenshot_data, str):
                screenshot_data = base64.b64decode(screenshot_data)
            image = Image.open(io.BytesIO(screenshot_data)).convert("RGB")

            # Process image
            annotated_image, elements = self.process_image(
                image=image,
                box_threshold=box_threshold,
                iou_threshold=iou_threshold,
                use_ocr=use_ocr,
            )

            # Convert annotated image to base64
            buffered = io.BytesIO()
            annotated_image.save(buffered, format="PNG")
            annotated_image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")

            # Generate screen info text
            screen_info = []
            parsed_content_list = []

            # Set element IDs and generate human-readable descriptions
            for i, elem in enumerate(elements):
                # Set the ID (1-indexed)
                elem.id = i + 1

                if isinstance(elem, IconElement):
                    screen_info.append(
                        f"Box #{i+1}: Icon (confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates})"
                    )
                    parsed_content_list.append(
                        {
                            "id": i + 1,
                            "type": "icon",
                            "bbox": elem.bbox.coordinates,
                            "confidence": elem.confidence,
                            "content": None,
                        }
                    )
                elif isinstance(elem, TextElement):
                    screen_info.append(
                        f"Box #{i+1}: Text '{elem.content}' (confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates})"
                    )
                    parsed_content_list.append(
                        {
                            "id": i + 1,
                            "type": "text",
                            "bbox": elem.bbox.coordinates,
                            "confidence": elem.confidence,
                            "content": elem.content,
                        }
                    )

            # Calculate metadata
            latency = time.time() - start_time
            width, height = image.size

            # Create ParseResult object with enhanced properties
            result = ParseResult(
                elements=elements,
                annotated_image_base64=annotated_image_base64,
                screen_info=screen_info,
                parsed_content_list=parsed_content_list,
                metadata=ParserMetadata(
                    image_size=(width, height),
                    num_icons=len([e for e in elements if isinstance(e, IconElement)]),
                    num_text=len([e for e in elements if isinstance(e, TextElement)]),
                    device=self.detector.device,
                    ocr_enabled=use_ocr,
                    latency=latency,
                ),
            )

            # Return the ParseResult object directly
            return result

        except Exception as e:
            logger.error(f"Error in parse: {str(e)}")
            import traceback

            logger.error(traceback.format_exc())
            raise


def main():
    """Command line interface for UI element detection."""
    parser = argparse.ArgumentParser(description="Detect UI elements and text in images")
    parser.add_argument("image_path", help="Path to the input image")
    parser.add_argument("--model-path", help="Path to YOLO model")
    parser.add_argument(
        "--box-threshold", type=float, default=0.3, help="Box confidence threshold (default: 0.3)"
    )
    parser.add_argument(
        "--iou-threshold", type=float, default=0.1, help="IOU threshold (default: 0.1)"
    )
    parser.add_argument(
        "--ocr", action="store_true", default=True, help="Enable OCR processing (default: True)"
    )
    parser.add_argument("--output", help="Output path for annotated image")
    args = parser.parse_args()

    # Setup logging
    logging.basicConfig(level=logging.INFO)

    try:
        # Initialize parser
        parser = OmniParser(model_path=args.model_path)

        # Load and process image
        logger.info(f"Loading image from: {args.image_path}")
        image = Image.open(args.image_path).convert("RGB")
        logger.info(f"Image loaded successfully, size: {image.size}")

        # Process image
        annotated_image, elements = parser.process_image(
            image=image,
            box_threshold=args.box_threshold,
            iou_threshold=args.iou_threshold,
            use_ocr=args.ocr,
        )

        # Save output image
        output_path = args.output or str(
            Path(args.image_path).parent
            / f"{Path(args.image_path).stem}_analyzed{Path(args.image_path).suffix}"
        )
        logger.info(f"Saving annotated image to: {output_path}")

        Path(output_path).parent.mkdir(parents=True, exist_ok=True)
        annotated_image.save(output_path)
        logger.info(f"Image saved successfully to {output_path}")

        # Print detections
        logger.info("\nDetections:")
        for i, elem in enumerate(elements):
            if isinstance(elem, IconElement):
                logger.info(
                    f"Interactive element {i}: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
                )
            elif isinstance(elem, TextElement):
                logger.info(f"Text {i}: '{elem.content}', bbox={elem.bbox.coordinates}")

    except Exception as e:
        logger.error(f"Error processing image: {str(e)}")
        import traceback

        logger.error(traceback.format_exc())
        return 1

    return 0


if __name__ == "__main__":
    import sys

    sys.exit(main())

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/cli.py:
--------------------------------------------------------------------------------

```python
"""
CLI chat interface for agent - Computer Use Agent

Usage:
    python -m agent.cli <model_string>

Examples:
    python -m agent.cli openai/computer-use-preview
    python -m agent.cli anthropic/claude-sonnet-4-5-20250929
    python -m agent.cli omniparser+anthropic/claude-sonnet-4-5-20250929
"""

try:
    import argparse
    import asyncio
    import base64
    import json
    import os
    import platform
    import sys
    import time
    from pathlib import Path
    from typing import Any, Dict, List

    import dotenv

    try:
        from PIL import Image, ImageDraw

        PIL_AVAILABLE = True
    except Exception:
        PIL_AVAILABLE = False
    from yaspin import yaspin
except ImportError:
    if __name__ == "__main__":
        raise ImportError(
            "CLI dependencies not found. " 'Please install with: pip install "cua-agent[cli]"'
        )

# Load environment variables
dotenv.load_dotenv()


# Color codes for terminal output
class Colors:
    RESET = "\033[0m"
    BOLD = "\033[1m"
    DIM = "\033[2m"

    # Text colors
    RED = "\033[31m"
    GREEN = "\033[32m"
    YELLOW = "\033[33m"
    BLUE = "\033[34m"
    MAGENTA = "\033[35m"
    CYAN = "\033[36m"
    WHITE = "\033[37m"
    GRAY = "\033[90m"

    # Background colors
    BG_RED = "\033[41m"
    BG_GREEN = "\033[42m"
    BG_YELLOW = "\033[43m"
    BG_BLUE = "\033[44m"


def print_colored(
    text: str,
    color: str = "",
    bold: bool = False,
    dim: bool = False,
    end: str = "\n",
    right: str = "",
):
    """Print colored text to terminal with optional right-aligned text."""
    prefix = ""
    if bold:
        prefix += Colors.BOLD
    if dim:
        prefix += Colors.DIM
    if color:
        prefix += color

    if right:
        # Get terminal width (default to 80 if unable to determine)
        try:
            import shutil

            terminal_width = shutil.get_terminal_size().columns
        except:
            terminal_width = 80

        # Add right margin
        terminal_width -= 1

        # Calculate padding needed
        # Account for ANSI escape codes not taking visual space
        visible_left_len = len(text)
        visible_right_len = len(right)
        padding = terminal_width - visible_left_len - visible_right_len

        if padding > 0:
            output = f"{prefix}{text}{' ' * padding}{right}{Colors.RESET}"
        else:
            # If not enough space, just put a single space between
            output = f"{prefix}{text} {right}{Colors.RESET}"
    else:
        output = f"{prefix}{text}{Colors.RESET}"

    print(output, end=end)


def print_action(action_type: str, details: Dict[str, Any], total_cost: float):
    """Print computer action with nice formatting."""
    # Format action details
    args_str = ""
    if action_type == "click" and "x" in details and "y" in details:
        args_str = f"_{details.get('button', 'left')}({details['x']}, {details['y']})"
    elif action_type == "type" and "text" in details:
        text = details["text"]
        if len(text) > 50:
            text = text[:47] + "..."
        args_str = f'("{text}")'
    elif action_type == "key" and "text" in details:
        args_str = f"('{details['text']}')"
    elif action_type == "scroll" and "x" in details and "y" in details:
        args_str = f"({details['x']}, {details['y']})"

    if total_cost > 0:
        print_colored(f"🛠️  {action_type}{args_str}", dim=True, right=f"💸 ${total_cost:.2f}")
    else:
        print_colored(f"🛠️  {action_type}{args_str}", dim=True)


def print_welcome(model: str, agent_loop: str, container_name: str):
    """Print welcome message."""
    print_colored(f"Connected to {container_name} ({model}, {agent_loop})")
    print_colored("Type 'exit' to quit.", dim=True)


async def ainput(prompt: str = ""):
    return await asyncio.to_thread(input, prompt)


async def chat_loop(
    agent, model: str, container_name: str, initial_prompt: str = "", show_usage: bool = True
):
    """Main chat loop with the agent."""
    print_welcome(model, agent.agent_config_info.agent_class.__name__, container_name)

    history = []

    if initial_prompt:
        history.append({"role": "user", "content": initial_prompt})

    total_cost = 0

    while True:
        if len(history) == 0 or history[-1].get("role") != "user":
            # Get user input with prompt
            print_colored("> ", end="")
            user_input = await ainput()

            if user_input.lower() in ["exit", "quit", "q"]:
                print_colored("\n👋 Goodbye!")
                break

            if not user_input:
                continue

            # Add user message to history
            history.append({"role": "user", "content": user_input})

        # Stream responses from the agent with spinner
        with yaspin(text="Thinking...", spinner="line", attrs=["dark"]) as spinner:
            spinner.hide()

            async for result in agent.run(history):
                # Add agent responses to history
                history.extend(result.get("output", []))

                if show_usage:
                    total_cost += result.get("usage", {}).get("response_cost", 0)

                # Process and display the output
                for item in result.get("output", []):
                    if item.get("type") == "message" and item.get("role") == "assistant":
                        # Display agent text response
                        content = item.get("content", [])
                        for content_part in content:
                            if content_part.get("text"):
                                text = content_part.get("text", "").strip()
                                if text:
                                    spinner.hide()
                                    print_colored(text)

                    elif item.get("type") == "computer_call":
                        # Display computer action
                        action = item.get("action", {})
                        action_type = action.get("type", "")
                        if action_type:
                            spinner.hide()
                            print_action(action_type, action, total_cost)
                            spinner.text = f"Performing {action_type}..."
                            spinner.show()

                    elif item.get("type") == "function_call":
                        # Display function call
                        function_name = item.get("name", "")
                        spinner.hide()
                        print_colored(f"🔧 Calling function: {function_name}", dim=True)
                        spinner.text = f"Calling {function_name}..."
                        spinner.show()

                    elif item.get("type") == "function_call_output":
                        # Display function output (dimmed)
                        output = item.get("output", "")
                        if output and len(output.strip()) > 0:
                            spinner.hide()
                            print_colored(f"📤 {output}", dim=True)

            spinner.hide()
            if show_usage and total_cost > 0:
                print_colored(f"Total cost: ${total_cost:.2f}", dim=True)


async def main():
    """Main CLI function."""
    parser = argparse.ArgumentParser(
        description="CUA Agent CLI - Interactive computer use assistant",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python -m agent.cli openai/computer-use-preview
  python -m agent.cli anthropic/claude-sonnet-4-5-20250929
  python -m agent.cli omniparser+anthropic/claude-sonnet-4-5-20250929
  python -m agent.cli huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B
        """,
    )

    parser.add_argument(
        "model",
        help="Model string (e.g., 'openai/computer-use-preview', 'anthropic/claude-sonnet-4-5-20250929')",
    )

    parser.add_argument(
        "--provider",
        choices=["cloud", "lume", "winsandbox", "docker"],
        default="cloud",
        help="Computer provider to use: cloud (default), lume, winsandbox, or docker",
    )

    parser.add_argument(
        "--images",
        type=int,
        default=3,
        help="Number of recent images to keep in context (default: 3)",
    )

    parser.add_argument("--trajectory", action="store_true", help="Save trajectory for debugging")

    parser.add_argument("--budget", type=float, help="Maximum budget for the session (in dollars)")

    parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")

    parser.add_argument(
        "-p",
        "--prompt",
        type=str,
        help="Initial prompt to send to the agent. Leave blank for interactive mode.",
    )

    parser.add_argument(
        "--prompt-file",
        type=Path,
        help="Path to a UTF-8 text file whose contents will be used as the initial prompt. If provided, overrides --prompt.",
    )

    parser.add_argument(
        "--predict-click",
        dest="predict_click",
        type=str,
        help="Instruction for click prediction. If set, runs predict_click, draws crosshair on a fresh screenshot, saves and opens it.",
    )

    parser.add_argument("-c", "--cache", action="store_true", help="Tell the API to enable caching")

    parser.add_argument(
        "-u", "--usage", action="store_true", help="Show total cost of the agent runs"
    )

    parser.add_argument(
        "-r",
        "--max-retries",
        type=int,
        default=3,
        help="Maximum number of retries for the LLM API calls",
    )

    # Provider override credentials
    parser.add_argument(
        "--api-key",
        dest="api_key",
        type=str,
        help="API key override for the model provider (passed to ComputerAgent)",
    )
    parser.add_argument(
        "--api-base",
        dest="api_base",
        type=str,
        help="API base URL override for the model provider (passed to ComputerAgent)",
    )

    args = parser.parse_args()

    # Check for required environment variables
    container_name = os.getenv("CUA_CONTAINER_NAME")
    cua_api_key = os.getenv("CUA_API_KEY")

    # Prompt for missing environment variables (container name always required)
    if not container_name:
        if args.provider == "cloud":
            print_colored("CUA_CONTAINER_NAME not set.", dim=True)
            print_colored("You can get a CUA container at https://cua.ai/", dim=True)
            container_name = input("Enter your CUA container name: ").strip()
            if not container_name:
                print_colored("❌ Container name is required.")
                sys.exit(1)
        else:
            container_name = "cli-sandbox"

    # Only require API key for cloud provider
    if args.provider == "cloud" and not cua_api_key:
        print_colored("CUA_API_KEY not set.", dim=True)
        cua_api_key = input("Enter your CUA API key: ").strip()
        if not cua_api_key:
            print_colored("❌ API key is required for cloud provider.")
            sys.exit(1)

    # Check for provider-specific API keys based on model
    provider_api_keys = {
        "openai/": "OPENAI_API_KEY",
        "anthropic/": "ANTHROPIC_API_KEY",
    }

    # Find matching provider and check for API key
    for prefix, env_var in provider_api_keys.items():
        if prefix in args.model:
            if not os.getenv(env_var):
                print_colored(f"{env_var} not set.", dim=True)
                api_key = input(f"Enter your {env_var.replace('_', ' ').title()}: ").strip()
                if not api_key:
                    print_colored(f"❌ {env_var.replace('_', ' ').title()} is required.")
                    sys.exit(1)
                # Set the environment variable for the session
                os.environ[env_var] = api_key
            break

    # Import here to avoid import errors if dependencies are missing
    try:
        from agent import ComputerAgent
        from computer import Computer
    except ImportError as e:
        print_colored(f"❌ Import error: {e}", Colors.RED, bold=True)
        print_colored("Make sure agent and computer libraries are installed.", Colors.YELLOW)
        sys.exit(1)

    # Resolve provider -> os_type, provider_type, api key requirement
    provider_map = {
        "cloud": ("linux", "cloud", True),
        "lume": ("macos", "lume", False),
        "winsandbox": ("windows", "winsandbox", False),
        "docker": ("linux", "docker", False),
    }
    os_type, provider_type, needs_api_key = provider_map[args.provider]

    computer_kwargs = {
        "os_type": os_type,
        "provider_type": provider_type,
        "name": container_name,
    }
    if needs_api_key:
        computer_kwargs["api_key"] = cua_api_key  # type: ignore

    # Create computer instance
    async with Computer(**computer_kwargs) as computer:  # type: ignore

        # Create agent
        agent_kwargs = {
            "model": args.model,
            "tools": [computer],
            "trust_remote_code": True,  # needed for some local models (e.g., InternVL, OpenCUA)
            "verbosity": 20 if args.verbose else 30,  # DEBUG vs WARNING
            "max_retries": args.max_retries,
        }

        # Thread API credentials to agent if provided
        if args.api_key:
            agent_kwargs["api_key"] = args.api_key
        if args.api_base:
            agent_kwargs["api_base"] = args.api_base

        if args.images > 0:
            agent_kwargs["only_n_most_recent_images"] = args.images

        if args.trajectory:
            agent_kwargs["trajectory_dir"] = "trajectories"

        if args.budget:
            agent_kwargs["max_trajectory_budget"] = {
                "max_budget": args.budget,
                "raise_error": True,
                "reset_after_each_run": False,
            }

        if args.cache:
            agent_kwargs["use_prompt_caching"] = True

        agent = ComputerAgent(**agent_kwargs)

        # If predict-click mode is requested, run once and exit
        if args.predict_click:
            if not PIL_AVAILABLE:
                print_colored(
                    "❌ Pillow (PIL) is required for --predict-click visualization. Install with: pip install pillow",
                    Colors.RED,
                    bold=True,
                )
                sys.exit(1)

            instruction = args.predict_click
            print_colored(f"Predicting click for: '{instruction}'", Colors.CYAN)

            # Take a fresh screenshot FIRST
            try:
                img_bytes = await computer.interface.screenshot()
            except Exception as e:
                print_colored(f"❌ Failed to take screenshot: {e}", Colors.RED, bold=True)
                sys.exit(1)

            # Encode screenshot to base64 for predict_click
            try:
                image_b64 = base64.b64encode(img_bytes).decode("utf-8")
            except Exception as e:
                print_colored(f"❌ Failed to encode screenshot: {e}", Colors.RED, bold=True)
                sys.exit(1)

            try:
                coords = await agent.predict_click(instruction, image_b64=image_b64)
            except Exception as e:
                print_colored(f"❌ predict_click failed: {e}", Colors.RED, bold=True)
                sys.exit(1)

            if not coords:
                print_colored("⚠️  No coordinates returned.", Colors.YELLOW)
                sys.exit(2)

            x, y = coords
            print_colored(f"✅ Predicted coordinates: ({x}, {y})", Colors.GREEN)

            try:
                from io import BytesIO

                with Image.open(BytesIO(img_bytes)) as img:
                    img = img.convert("RGB")
                    draw = ImageDraw.Draw(img)
                    # Draw crosshair
                    size = 12
                    color = (255, 0, 0)
                    draw.line([(x - size, y), (x + size, y)], fill=color, width=3)
                    draw.line([(x, y - size), (x, y + size)], fill=color, width=3)
                    # Optional small circle
                    r = 6
                    draw.ellipse([(x - r, y - r), (x + r, y + r)], outline=color, width=2)

                    out_path = Path.cwd() / f"predict_click_{int(time.time())}.png"
                    img.save(out_path)
                    print_colored(f"🖼️  Saved to {out_path}")

                    # Open the image with default viewer
                    try:
                        system = platform.system().lower()
                        if system == "windows":
                            os.startfile(str(out_path))  # type: ignore[attr-defined]
                        elif system == "darwin":
                            os.system(f'open "{out_path}"')
                        else:
                            os.system(f'xdg-open "{out_path}"')
                    except Exception:
                        pass
            except Exception as e:
                print_colored(f"❌ Failed to render/save screenshot: {e}", Colors.RED, bold=True)
                sys.exit(1)

            # Done
            sys.exit(0)

        # Resolve initial prompt from --prompt-file or --prompt
        initial_prompt = args.prompt or ""
        if args.prompt_file:
            try:
                initial_prompt = args.prompt_file.read_text(encoding="utf-8")
            except Exception as e:
                print_colored(f"❌ Failed to read --prompt-file: {e}", Colors.RED, bold=True)
                sys.exit(1)

        # Start chat loop (default interactive mode)
        await chat_loop(agent, args.model, container_name, initial_prompt, args.usage)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except (KeyboardInterrupt, EOFError) as _:
        print_colored("\n\n👋 Goodbye!")

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/moondream3.py:
--------------------------------------------------------------------------------

```python
"""
Moondream3+ composed-grounded agent loop implementation.
Grounding is handled by a local Moondream3 preview model via Transformers.
Thinking is delegated to the trailing LLM in the composed model string: "moondream3+<thinking_model>".

Differences from composed_grounded:
- Provides a singleton Moondream3 client outside the class.
- predict_click uses model.point(image, instruction, settings={"max_objects": 1}) and returns pixel coordinates.
- If the last image was a screenshot (or we take one), run model.detect(image, "all form ui") to get bboxes, then
  run model.caption on each cropped bbox to label it. Overlay labels on the screenshot and emit via _on_screenshot.
- Add a user message listing all detected form UI names so the thinker can reference them.
- If the thinking model doesn't support vision, filter out image content before calling litellm.
"""

from __future__ import annotations

import base64
import io
import uuid
from typing import Any, Dict, List, Optional, Tuple

import litellm
from PIL import Image, ImageDraw, ImageFont

from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
    convert_completion_messages_to_responses_items,
    convert_computer_calls_desc2xy,
    convert_computer_calls_xy2desc,
    convert_responses_items_to_completion_messages,
    get_all_element_descriptions,
)
from ..types import AgentCapability

_MOONDREAM_SINGLETON = None


def get_moondream_model() -> Any:
    """Get a singleton instance of the Moondream3 preview model."""
    global _MOONDREAM_SINGLETON
    if _MOONDREAM_SINGLETON is None:
        try:
            import torch
            from transformers import AutoModelForCausalLM

            _MOONDREAM_SINGLETON = AutoModelForCausalLM.from_pretrained(
                "moondream/moondream3-preview",
                trust_remote_code=True,
                torch_dtype=torch.bfloat16,
                device_map="cuda",
            )
        except ImportError as e:
            raise RuntimeError(
                "moondream3 requires torch and transformers. Install with: pip install cua-agent[moondream3]"
            ) from e
    return _MOONDREAM_SINGLETON


def _decode_image_b64(image_b64: str) -> Image.Image:
    data = base64.b64decode(image_b64)
    return Image.open(io.BytesIO(data)).convert("RGB")


def _image_to_b64(img: Image.Image) -> str:
    buf = io.BytesIO()
    img.save(buf, format="PNG")
    return base64.b64encode(buf.getvalue()).decode("utf-8")


def _supports_vision(model: str) -> bool:
    """Heuristic vision support detection for thinking model."""
    m = model.lower()
    vision_markers = [
        "gpt-4o",
        "gpt-4.1",
        "o1",
        "o3",
        "claude-3",
        "claude-3.5",
        "sonnet",
        "haiku",
        "opus",
        "gemini-1.5",
        "llava",
    ]
    return any(v in m for v in vision_markers)


def _filter_images_from_completion_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    filtered: List[Dict[str, Any]] = []
    for msg in messages:
        msg_copy = {**msg}
        content = msg_copy.get("content")
        if isinstance(content, list):
            msg_copy["content"] = [c for c in content if c.get("type") != "image_url"]
        filtered.append(msg_copy)
    return filtered


def _annotate_detect_and_label_ui(base_img: Image.Image, model_md) -> Tuple[str, List[str]]:
    """Detect UI elements with Moondream, caption each, draw labels with backgrounds.

    Args:
        base_img: PIL image of the screenshot (RGB or RGBA). Will be copied/converted internally.
        model_md: Moondream model instance with .detect() and .query() methods.

    Returns:
        A tuple of (annotated_image_base64_png, detected_names)
    """
    # Ensure RGBA for semi-transparent fills
    if base_img.mode != "RGBA":
        base_img = base_img.convert("RGBA")
    W, H = base_img.width, base_img.height

    # Detect objects
    try:
        detect_result = model_md.detect(base_img, "all ui elements")
        objects = detect_result.get("objects", []) if isinstance(detect_result, dict) else []
    except Exception:
        objects = []

    draw = ImageDraw.Draw(base_img)
    try:
        font = ImageFont.load_default()
    except Exception:
        font = None

    detected_names: List[str] = []

    for i, obj in enumerate(objects):
        try:
            # Clamp normalized coords and crop
            x_min = max(0.0, min(1.0, float(obj.get("x_min", 0.0))))
            y_min = max(0.0, min(1.0, float(obj.get("y_min", 0.0))))
            x_max = max(0.0, min(1.0, float(obj.get("x_max", 0.0))))
            y_max = max(0.0, min(1.0, float(obj.get("y_max", 0.0))))
            left, top, right, bottom = (
                int(x_min * W),
                int(y_min * H),
                int(x_max * W),
                int(y_max * H),
            )
            left, top = max(0, left), max(0, top)
            right, bottom = min(W - 1, right), min(H - 1, bottom)
            crop = base_img.crop((left, top, right, bottom))

            # Prompted short caption
            try:
                result = model_md.query(crop, "Caption this UI element in few words.")
                caption_text = (result or {}).get("answer", "")
            except Exception:
                caption_text = ""

            name = (caption_text or "").strip() or f"element_{i+1}"
            detected_names.append(name)

            # Draw bbox
            draw.rectangle([left, top, right, bottom], outline=(255, 215, 0, 255), width=2)

            # Label background with padding and rounded corners
            label = f"{i+1}. {name}"
            padding = 3
            if font:
                text_bbox = draw.textbbox((0, 0), label, font=font)
            else:
                text_bbox = draw.textbbox((0, 0), label)
            text_w = text_bbox[2] - text_bbox[0]
            text_h = text_bbox[3] - text_bbox[1]

            tx = left + 3
            ty = top - (text_h + 2 * padding + 4)
            if ty < 0:
                ty = top + 3

            bg_left = tx - padding
            bg_top = ty - padding
            bg_right = tx + text_w + padding
            bg_bottom = ty + text_h + padding
            try:
                draw.rounded_rectangle(
                    [bg_left, bg_top, bg_right, bg_bottom],
                    radius=4,
                    fill=(0, 0, 0, 160),
                    outline=(255, 215, 0, 200),
                    width=1,
                )
            except Exception:
                draw.rectangle(
                    [bg_left, bg_top, bg_right, bg_bottom],
                    fill=(0, 0, 0, 160),
                    outline=(255, 215, 0, 200),
                    width=1,
                )

            text_fill = (255, 255, 255, 255)
            if font:
                draw.text((tx, ty), label, fill=text_fill, font=font)
            else:
                draw.text((tx, ty), label, fill=text_fill)
        except Exception:
            continue

    # Encode PNG base64
    annotated = base_img
    if annotated.mode not in ("RGBA", "RGB"):
        annotated = annotated.convert("RGBA")
    annotated_b64 = _image_to_b64(annotated)
    return annotated_b64, detected_names


GROUNDED_COMPUTER_TOOL_SCHEMA = {
    "type": "function",
    "function": {
        "name": "computer",
        "description": (
            "Control a computer by taking screenshots and interacting with UI elements. "
            "The screenshot action will include a list of detected form UI element names when available. "
            "Use element descriptions to locate and interact with UI elements on the screen."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "action": {
                    "type": "string",
                    "enum": [
                        "screenshot",
                        "click",
                        "double_click",
                        "drag",
                        "type",
                        "keypress",
                        "scroll",
                        "move",
                        "wait",
                        "get_current_url",
                        "get_dimensions",
                        "get_environment",
                    ],
                    "description": "The action to perform (required for all actions)",
                },
                "element_description": {
                    "type": "string",
                    "description": "Description of the element to interact with (required for click/double_click/move/scroll)",
                },
                "start_element_description": {
                    "type": "string",
                    "description": "Description of the element to start dragging from (required for drag)",
                },
                "end_element_description": {
                    "type": "string",
                    "description": "Description of the element to drag to (required for drag)",
                },
                "text": {
                    "type": "string",
                    "description": "The text to type (required for type)",
                },
                "keys": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Key(s) to press (required for keypress)",
                },
                "button": {
                    "type": "string",
                    "enum": ["left", "right", "wheel", "back", "forward"],
                    "description": "The mouse button to use for click/double_click",
                },
                "scroll_x": {
                    "type": "integer",
                    "description": "Horizontal scroll amount (required for scroll)",
                },
                "scroll_y": {
                    "type": "integer",
                    "description": "Vertical scroll amount (required for scroll)",
                },
            },
            "required": ["action"],
        },
    },
}


@register_agent(r"moondream3\+.*", priority=2)
class Moondream3PlusConfig(AsyncAgentConfig):
    def __init__(self):
        self.desc2xy: Dict[str, Tuple[float, float]] = {}

    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs,
    ) -> Dict[str, Any]:
        # Parse composed model: moondream3+<thinking_model>
        if "+" not in model:
            raise ValueError(f"Composed model must be 'moondream3+<thinking_model>', got: {model}")
        _, thinking_model = model.split("+", 1)

        pre_output_items: List[Dict[str, Any]] = []

        # Acquire last screenshot; if missing, take one
        last_image_b64: Optional[str] = None
        for message in reversed(messages):
            if (
                isinstance(message, dict)
                and message.get("type") == "computer_call_output"
                and isinstance(message.get("output"), dict)
                and message["output"].get("type") == "input_image"
            ):
                image_url = message["output"].get("image_url", "")
                if image_url.startswith("data:image/png;base64,"):
                    last_image_b64 = image_url.split(",", 1)[1]
                    break

        if last_image_b64 is None and computer_handler is not None:
            # Take a screenshot
            screenshot_b64 = await computer_handler.screenshot()  # type: ignore
            if screenshot_b64:
                call_id = uuid.uuid4().hex
                pre_output_items += [
                    {
                        "type": "message",
                        "role": "assistant",
                        "content": [
                            {
                                "type": "output_text",
                                "text": "Taking a screenshot to analyze the current screen.",
                            }
                        ],
                    },
                    {
                        "type": "computer_call",
                        "call_id": call_id,
                        "status": "completed",
                        "action": {"type": "screenshot"},
                    },
                    {
                        "type": "computer_call_output",
                        "call_id": call_id,
                        "output": {
                            "type": "input_image",
                            "image_url": f"data:image/png;base64,{screenshot_b64}",
                        },
                    },
                ]
                last_image_b64 = screenshot_b64
                if _on_screenshot:
                    await _on_screenshot(screenshot_b64)

        # If we have a last screenshot, run Moondream detection and labeling
        detected_names: List[str] = []
        if last_image_b64 is not None:
            base_img = _decode_image_b64(last_image_b64)
            model_md = get_moondream_model()
            annotated_b64, detected_names = _annotate_detect_and_label_ui(base_img, model_md)
            if _on_screenshot:
                await _on_screenshot(annotated_b64, "annotated_form_ui")

            # Also push a user message listing all detected names
            if detected_names:
                names_text = "\n".join(f"- {n}" for n in detected_names)
                pre_output_items.append(
                    {
                        "type": "message",
                        "role": "user",
                        "content": [
                            {"type": "input_text", "text": "Detected form UI elements on screen:"},
                            {"type": "input_text", "text": names_text},
                            {
                                "type": "input_text",
                                "text": "Please continue with the next action needed to perform your task.",
                            },
                        ],
                    }
                )

        tool_schemas = []
        for schema in tools or []:
            if schema.get("type") == "computer":
                tool_schemas.append(GROUNDED_COMPUTER_TOOL_SCHEMA)
            else:
                tool_schemas.append(schema)

        # Step 1: Convert computer calls from xy to descriptions
        input_messages = messages + pre_output_items
        messages_with_descriptions = convert_computer_calls_xy2desc(input_messages, self.desc2xy)

        # Step 2: Convert responses items to completion messages
        completion_messages = convert_responses_items_to_completion_messages(
            messages_with_descriptions,
            allow_images_in_tool_results=False,
        )

        # Optionally filter images if model lacks vision
        if not _supports_vision(thinking_model):
            completion_messages = _filter_images_from_completion_messages(completion_messages)

        # Step 3: Call thinking model with litellm.acompletion
        api_kwargs = {
            "model": thinking_model,
            "messages": completion_messages,
            "tools": tool_schemas,
            "max_retries": max_retries,
            "stream": stream,
            **kwargs,
        }
        if use_prompt_caching:
            api_kwargs["use_prompt_caching"] = use_prompt_caching

        if _on_api_start:
            await _on_api_start(api_kwargs)

        response = await litellm.acompletion(**api_kwargs)

        if _on_api_end:
            await _on_api_end(api_kwargs, response)

        usage = {
            **response.usage.model_dump(),  # type: ignore
            "response_cost": response._hidden_params.get("response_cost", 0.0),
        }
        if _on_usage:
            await _on_usage(usage)

        # Step 4: Convert completion messages back to responses items format
        response_dict = response.model_dump()  # type: ignore
        choice_messages = [choice["message"] for choice in response_dict["choices"]]
        thinking_output_items: List[Dict[str, Any]] = []
        for choice_message in choice_messages:
            thinking_output_items.extend(
                convert_completion_messages_to_responses_items([choice_message])
            )

        # Step 5: Use Moondream to get coordinates for each description
        element_descriptions = get_all_element_descriptions(thinking_output_items)
        if element_descriptions and last_image_b64:
            for desc in element_descriptions:
                for _ in range(3):  # try 3 times
                    coords = await self.predict_click(
                        model=model,
                        image_b64=last_image_b64,
                        instruction=desc,
                    )
                    if coords:
                        self.desc2xy[desc] = coords
                        break

        # Step 6: Convert computer calls from descriptions back to xy coordinates
        final_output_items = convert_computer_calls_desc2xy(thinking_output_items, self.desc2xy)

        # Step 7: Return output and usage
        return {"output": pre_output_items + final_output_items, "usage": usage}

    async def predict_click(
        self,
        model: str,
        image_b64: str,
        instruction: str,
        **kwargs,
    ) -> Optional[Tuple[float, float]]:
        """Predict click coordinates using Moondream3's point API.

        Returns pixel coordinates (x, y) as floats.
        """
        img = _decode_image_b64(image_b64)
        W, H = img.width, img.height
        model_md = get_moondream_model()
        try:
            result = model_md.point(img, instruction, settings={"max_objects": 1})
        except Exception:
            return None

        try:
            pt = (result or {}).get("points", [])[0]
            x_norm = float(pt.get("x", 0.0))
            y_norm = float(pt.get("y", 0.0))
            x_px = max(0.0, min(float(W - 1), x_norm * W))
            y_px = max(0.0, min(float(H - 1), y_norm * H))
            return (x_px, y_px)
        except Exception:
            return None

    def get_capabilities(self) -> List[AgentCapability]:
        return ["click", "step"]

```

--------------------------------------------------------------------------------
/docs/src/app/(home)/[[...slug]]/page.tsx:
--------------------------------------------------------------------------------

```typescript
import { getApiVersions, source } from '@/lib/source';
import { getMDXComponents } from '@/mdx-components';
import { buttonVariants } from 'fumadocs-ui/components/ui/button';
import { Popover, PopoverContent, PopoverTrigger } from 'fumadocs-ui/components/ui/popover';
import { createRelativeLink } from 'fumadocs-ui/mdx';
import { DocsBody, DocsDescription, DocsPage, DocsTitle } from 'fumadocs-ui/page';
import { cn } from 'fumadocs-ui/utils/cn';
import { ChevronDown, CodeXml, ExternalLink } from 'lucide-react';
import type { Metadata } from 'next';
import Link from 'next/link';
import { notFound } from 'next/navigation';
import { PageFeedback } from '@/components/page-feedback';
import { DocActionsMenu } from '@/components/doc-actions-menu';

export default async function Page(props: { params: Promise<{ slug?: string[] }> }) {
  const params = await props.params;
  const slug = params.slug || [];

  const page = source.getPage(slug);
  if (!page) notFound();

  // Detect if this is an API reference page: /api/[section] or /api/[section]/[version]
  let apiSection: string | null = null;
  let apiVersionSlug: string[] = [];
  if (slug[0] === 'api' && slug.length >= 2) {
    apiSection = slug[1];
    if (slug.length > 2) {
      apiVersionSlug = slug.slice(2);
    }
  }

  let versionItems: { label: string; slug: string[] }[] = [];
  if (apiSection) {
    versionItems = await getApiVersions(apiSection);
  }

  const macos = page.data.macos;
  const windows = page.data.windows;
  const linux = page.data.linux;
  const pypi = page.data.pypi;
  const npm = page.data.npm;
  const github = page.data.github;

  const MDXContent = page.data.body;

  // Platform icons component
  const PlatformIcons = () => {
    const hasAnyPlatform = macos || windows || linux;
    if (!hasAnyPlatform && !pypi) return null;

    return (
      <div className="flex flex-col gap-2">
        {hasAnyPlatform && (
          <div className="flex flex-row gap-2 items-left dark:text-neutral-400">
            {windows && (
              <svg
                xmlns="http://www.w3.org/2000/svg"
                fill="currentColor"
                className="h-5"
                viewBox="0 0 448 512"
              >
                <title>Windows</title>
                <path d="M0 93.7l183.6-25.3v177.4H0V93.7zm0 324.6l183.6 25.3V268.4H0v149.9zm203.8 28L448 480V268.4H203.8v177.9zm0-380.6v180.1H448V32L203.8 65.7z" />
              </svg>
            )}
            {macos && (
              <svg
                xmlns="http://www.w3.org/2000/svg"
                fill="currentColor"
                className="h-5"
                viewBox="0 0 384 512"
              >
                <title>macOS</title>
                <path d="M318.7 268.7c-.2-36.7 16.4-64.4 50-84.8-18.8-26.9-47.2-41.7-84.7-44.6-35.5-2.8-74.3 20.7-88.5 20.7-15 0-49.4-19.7-76.4-19.7C63.3 141.2 4 184.8 4 273.5q0 39.3 14.4 81.2c12.8 36.7 59 126.7 107.2 125.2 25.2-.6 43-17.9 75.8-17.9 31.8 0 48.3 17.9 76.4 17.9 48.6-.7 90.4-82.5 102.6-119.3-65.2-30.7-61.7-90-61.7-91.9zm-56.6-164.2c27.3-32.4 24.8-61.9 24-72.5-24.1 1.4-52 16.4-67.9 34.9-17.5 19.8-27.8 44.3-25.6 71.9 26.1 2 49.9-11.4 69.5-34.3z" />
              </svg>
            )}
            {linux && (
              <svg
                xmlns="http://www.w3.org/2000/svg"
                fill="currentColor"
                className="h-5"
                viewBox="0 0 448 512"
              >
                <title>Linux</title>
                <path d="M220.8 123.3c1 .5 1.8 1.7 3 1.7 1.1 0 2.8-.4 2.9-1.5 .2-1.4-1.9-2.3-3.2-2.9-1.7-.7-3.9-1-5.5-.1-.4 .2-.8 .7-.6 1.1 .3 1.3 2.3 1.1 3.4 1.7zm-21.9 1.7c1.2 0 2-1.2 3-1.7 1.1-.6 3.1-.4 3.5-1.6 .2-.4-.2-.9-.6-1.1-1.6-.9-3.8-.6-5.5 .1-1.3 .6-3.4 1.5-3.2 2.9 .1 1 1.8 1.5 2.8 1.4zM420 403.8c-3.6-4-5.3-11.6-7.2-19.7-1.8-8.1-3.9-16.8-10.5-22.4-1.3-1.1-2.6-2.1-4-2.9-1.3-.8-2.7-1.5-4.1-2 9.2-27.3 5.6-54.5-3.7-79.1-11.4-30.1-31.3-56.4-46.5-74.4-17.1-21.5-33.7-41.9-33.4-72C311.1 85.4 315.7 .1 234.8 0 132.4-.2 158 103.4 156.9 135.2c-1.7 23.4-6.4 41.8-22.5 64.7-18.9 22.5-45.5 58.8-58.1 96.7-6 17.9-8.8 36.1-6.2 53.3-6.5 5.8-11.4 14.7-16.6 20.2-4.2 4.3-10.3 5.9-17 8.3s-14 6-18.5 14.5c-2.1 3.9-2.8 8.1-2.8 12.4 0 3.9 .6 7.9 1.2 11.8 1.2 8.1 2.5 15.7 .8 20.8-5.2 14.4-5.9 24.4-2.2 31.7 3.8 7.3 11.4 10.5 20.1 12.3 17.3 3.6 40.8 2.7 59.3 12.5 19.8 10.4 39.9 14.1 55.9 10.4 11.6-2.6 21.1-9.6 25.9-20.2 12.5-.1 26.3-5.4 48.3-6.6 14.9-1.2 33.6 5.3 55.1 4.1 .6 2.3 1.4 4.6 2.5 6.7v.1c8.3 16.7 23.8 24.3 40.3 23 16.6-1.3 34.1-11 48.3-27.9 13.6-16.4 36-23.2 50.9-32.2 7.4-4.5 13.4-10.1 13.9-18.3 .4-8.2-4.4-17.3-15.5-29.7zM223.7 87.3c9.8-22.2 34.2-21.8 44-.4 6.5 14.2 3.6 30.9-4.3 40.4-1.6-.8-5.9-2.6-12.6-4.9 1.1-1.2 3.1-2.7 3.9-4.6 4.8-11.8-.2-27-9.1-27.3-7.3-.5-13.9 10.8-11.8 23-4.1-2-9.4-3.5-13-4.4-1-6.9-.3-14.6 2.9-21.8zM183 75.8c10.1 0 20.8 14.2 19.1 33.5-3.5 1-7.1 2.5-10.2 4.6 1.2-8.9-3.3-20.1-9.6-19.6-8.4 .7-9.8 21.2-1.8 28.1 1 .8 1.9-.2-5.9 5.5-15.6-14.6-10.5-52.1 8.4-52.1zm-13.6 60.7c6.2-4.6 13.6-10 14.1-10.5 4.7-4.4 13.5-14.2 27.9-14.2 7.1 0 15.6 2.3 25.9 8.9 6.3 4.1 11.3 4.4 22.6 9.3 8.4 3.5 13.7 9.7 10.5 18.2-2.6 7.1-11 14.4-22.7 18.1-11.1 3.6-19.8 16-38.2 14.9-3.9-.2-7-1-9.6-2.1-8-3.5-12.2-10.4-20-15-8.6-4.8-13.2-10.4-14.7-15.3-1.4-4.9 0-9 4.2-12.3zm3.3 334c-2.7 35.1-43.9 34.4-75.3 18-29.9-15.8-68.6-6.5-76.5-21.9-2.4-4.7-2.4-12.7 2.6-26.4v-.2c2.4-7.6 .6-16-.6-23.9-1.2-7.8-1.8-15 .9-20 3.5-6.7 8.5-9.1 14.8-11.3 10.3-3.7 11.8-3.4 19.6-9.9 5.5-5.7 9.5-12.9 14.3-18 5.1-5.5 10-8.1 17.7-6.9 8.1 1.2 15.1 6.8 21.9 16l19.6 35.6c9.5 19.9 43.1 48.4 41 68.9zm-1.4-25.9c-4.1-6.6-9.6-13.6-14.4-19.6 7.1 0 14.2-2.2 16.7-8.9 2.3-6.2 0-14.9-7.4-24.9-13.5-18.2-38.3-32.5-38.3-32.5-13.5-8.4-21.1-18.7-24.6-29.9s-3-23.3-.3-35.2c5.2-22.9 18.6-45.2 27.2-59.2 2.3-1.7 .8 3.2-8.7 20.8-8.5 16.1-24.4 53.3-2.6 82.4 .6-20.7 5.5-41.8 13.8-61.5 12-27.4 37.3-74.9 39.3-112.7 1.1 .8 4.6 3.2 6.2 4.1 4.6 2.7 8.1 6.7 12.6 10.3 12.4 10 28.5 9.2 42.4 1.2 6.2-3.5 11.2-7.5 15.9-9 9.9-3.1 17.8-8.6 22.3-15 7.7 30.4 25.7 74.3 37.2 95.7 6.1 11.4 18.3 35.5 23.6 64.6 3.3-.1 7 .4 10.9 1.4 13.8-35.7-11.7-74.2-23.3-84.9-4.7-4.6-4.9-6.6-2.6-6.5 12.6 11.2 29.2 33.7 35.2 59 2.8 11.6 3.3 23.7 .4 35.7 16.4 6.8 35.9 17.9 30.7 34.8-2.2-.1-3.2 0-4.2 0 3.2-10.1-3.9-17.6-22.8-26.1-19.6-8.6-36-8.6-38.3 12.5-12.1 4.2-18.3 14.7-21.4 27.3-2.8 11.2-3.6 24.7-4.4 39.9-.5 7.7-3.6 18-6.8 29-32.1 22.9-76.7 32.9-114.3 7.2zm257.4-11.5c-.9 16.8-41.2 19.9-63.2 46.5-13.2 15.7-29.4 24.4-43.6 25.5s-26.5-4.8-33.7-19.3c-4.7-11.1-2.4-23.1 1.1-36.3 3.7-14.2 9.2-28.8 9.9-40.6 .8-15.2 1.7-28.5 4.2-38.7 2.6-10.3 6.6-17.2 13.7-21.1 .3-.2 .7-.3 1-.5 .8 13.2 7.3 26.6 18.8 29.5 12.6 3.3 30.7-7.5 38.4-16.3 9-.3 15.7-.9 22.6 5.1 9.9 8.5 7.1 30.3 17.1 41.6 10.6 11.6 14 19.5 13.7 24.6zM173.3 148.7c2 1.9 4.7 4.5 8 7.1 6.6 5.2 15.8 10.6 27.3 10.6 11.6 0 22.5-5.9 31.8-10.8 4.9-2.6 10.9-7 14.8-10.4s5.9-6.3 3.1-6.6-2.6 2.6-6 5.1c-4.4 3.2-9.7 7.4-13.9 9.8-7.4 4.2-19.5 10.2-29.9 10.2s-18.7-4.8-24.9-9.7c-3.1-2.5-5.7-5-7.7-6.9-1.5-1.4-1.9-4.6-4.3-4.9-1.4-.1-1.8 3.7 1.7 6.5z" />
              </svg>
            )}
          </div>
        )}

        <div className="flex flex-row gap-2 items-left">
          {pypi && (
            <a target="_blank" href={`https://pypi.org/project/${pypi}/`} rel="noreferrer">
              <img
                src={`https://img.shields.io/pypi/v/${pypi}?color=blue`}
                className="h-5"
                alt="PyPI"
              />
            </a>
          )}
          {npm && (
            <a target="_blank" href={`https://www.npmjs.com/package/${npm}`} rel="noreferrer">
              <img
                src={`https://img.shields.io/npm/v/${npm}?color=bf4c4b`}
                className="h-5"
                alt="NPM"
              />
            </a>
          )}
        </div>
      </div>
    );
  };

  const tocHeader = () => {
    return (
      <div className="w-fit">
        <PlatformIcons />
        <div className="flex gap-2 mt-2">
          {github &&
            github.length > 0 &&
            (github.length === 1 ? (
              <a
                href={github[0]}
                rel="noreferrer noopener"
                target="_blank"
                className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&amp;_svg]:size-5 text-fd-muted-foreground md:[&amp;_svg]:size-4.5"
                aria-label="Source"
                data-active="false"
              >
                <svg role="img" viewBox="0 0 24 24" fill="currentColor">
                  <path d="M12 .297c-6.63 0-12 5.373-12 12 0 5.303 3.438 9.8 8.205 11.385.6.113.82-.258.82-.577 0-.285-.01-1.04-.015-2.04-3.338.724-4.042-1.61-4.042-1.61C4.422 18.07 3.633 17.7 3.633 17.7c-1.087-.744.084-.729.084-.729 1.205.084 1.838 1.236 1.838 1.236 1.07 1.835 2.809 1.305 3.495.998.108-.776.417-1.305.76-1.605-2.665-.3-5.466-1.332-5.466-5.93 0-1.31.465-2.38 1.235-3.22-.135-.303-.54-1.523.105-3.176 0 0 1.005-.322 3.3 1.23.96-.267 1.98-.399 3-.405 1.02.006 2.04.138 3 .405 2.28-1.552 3.285-1.23 3.285-1.23.645 1.653.24 2.873.12 3.176.765.84 1.23 1.91 1.23 3.22 0 4.61-2.805 5.625-5.475 5.92.42.36.81 1.096.81 2.22 0 1.606-.015 2.896-.015 3.286 0 .315.21.69.825.57C20.565 22.092 24 17.592 24 12.297c0-6.627-5.373-12-12-12"></path>
                </svg>
                Source
                <ExternalLink className="w-4 h-4 ml-auto" />
              </a>
            ) : (
              <Popover>
                <PopoverTrigger className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&_svg]:size-5 text-fd-muted-foreground md:[&_svg]:size-4.5">
                  <svg role="img" viewBox="0 0 24 24" fill="currentColor">
                    <path d="M12 .297c-6.63 0-12 5.373-12 12 0 5.303 3.438 9.8 8.205 11.385.6.113.82-.258.82-.577 0-.285-.01-1.04-.015-2.04-3.338.724-4.042-1.61-4.042-1.61C4.422 18.07 3.633 17.7 3.633 17.7c-1.087-.744.084-.729.084-.729 1.205.084 1.838 1.236 1.838 1.236 1.07 1.835 2.809 1.305 3.495.998.108-.776.417-1.305.76-1.605-2.665-.3-5.466-1.332-5.466-5.93 0-1.31.465-2.38 1.235-3.22-.135-.303-.54-1.523.105-3.176 0 0 1.005-.322 3.3 1.23.96-.267 1.98-.399 3-.405 1.02.006 2.04.138 3 .405 2.28-1.552 3.285-1.23 3.285-1.23.645 1.653.24 2.873.12 3.176.765.84 1.23 1.91 1.23 3.22 0 4.61-2.805 5.625-5.475 5.92.42.36.81 1.096.81 2.22 0 1.606-.015 2.896-.015 3.286 0 .315.21.69.825.57C20.565 22.092 24 17.592 24 12.297c0-6.627-5.373-12-12-12"></path>
                  </svg>
                  Source
                  <ChevronDown className="h-4 w-4" />
                </PopoverTrigger>
                <PopoverContent className="w-48 p-1">
                  <div className="flex flex-col gap-1">
                    {github.map((link, index) => (
                      <a
                        key={index}
                        href={link}
                        rel="noreferrer noopener"
                        target="_blank"
                        className="inline-flex gap-2 w-full items-center rounded-md p-2 text-sm hover:bg-fd-accent hover:text-fd-accent-foreground"
                      >
                        {link.includes('python')
                          ? 'Python'
                          : link.includes('typescript')
                            ? 'TypeScript'
                            : `Source ${index + 1}`}
                        <ExternalLink className="w-4 h-4 ml-auto" />
                      </a>
                    ))}
                  </div>
                </PopoverContent>
              </Popover>
            ))}
          {/*slug.includes('libraries') && (
            <a
              className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&amp;_svg]:size-5 text-fd-muted-foreground md:[&amp;_svg]:size-4.5"
              href={`/api/${page.data.title.toLowerCase()}`}>
              <CodeXml size={12} />
              Reference
            </a>
          )*/}
        </div>
        <hr className="my-2 border-t border-fd-border" />
      </div>
    );
  };

  const tocFooter = () => {
    // Construct file path from slug
    // For root index, use 'index.mdx', otherwise join slug parts
    const filePath = slug.length === 0 ? 'index.mdx' : `${slug.join('/')}.mdx`;

    return (
      <div className="mt-4">
        <DocActionsMenu pageUrl={page.url} pageTitle={page.data.title} filePath={filePath} />
      </div>
    );
  };

  return (
    <DocsPage
      toc={page.data.toc}
      tableOfContent={{ header: tocHeader(), footer: tocFooter() }}
      full={page.data.full}
    >
      <div className="flex flex-row w-full items-start">
        <div className="flex-1">
          <div className="flex flex-row w-full">
            {slug.length > 0 && <DocsTitle>{page.data.title}</DocsTitle>}

            <div className="ml-auto flex items-center gap-2">
              {apiSection && versionItems.length > 1 && (
                <Popover>
                  <PopoverTrigger
                    className={cn(
                      buttonVariants({
                        color: 'secondary',
                        size: 'sm',
                        className: 'gap-2',
                      })
                    )}
                  >
                    {(() => {
                      // Find the current version label
                      let currentLabel = 'Current';
                      if (apiVersionSlug.length > 0) {
                        const found = versionItems.find(
                          (item) => item.label !== 'Current' && apiVersionSlug[0] === item.label
                        );
                        if (found) currentLabel = found.label;
                      }
                      return (
                        <>
                          API Version: {currentLabel}
                          <ChevronDown className="size-3.5 text-fd-muted-foreground" />
                        </>
                      );
                    })()}
                  </PopoverTrigger>
                  <PopoverContent className="flex flex-col overflow-auto">
                    {versionItems.map((item) => {
                      // Build the href for each version
                      const href =
                        item.label === 'Current'
                          ? `/api/${apiSection}`
                          : `/api/${apiSection}/${item.label}`;
                      // Highlight current version
                      const isCurrent =
                        (item.label === 'Current' && apiVersionSlug.length === 0) ||
                        (item.label !== 'Current' && apiVersionSlug[0] === item.label);
                      return (
                        <Link
                          key={item.label}
                          href={href}
                          className={cn(
                            'px-3 py-1 rounded hover:bg-fd-muted',
                            isCurrent && 'font-bold bg-fd-muted'
                          )}
                        >
                          API version: {item.label}
                        </Link>
                      );
                    })}
                  </PopoverContent>
                </Popover>
              )}
            </div>
          </div>
          <DocsDescription className="text-md mt-1">{page.data.description}</DocsDescription>
        </div>
      </div>
      <DocsBody>
        <MDXContent
          components={getMDXComponents({
            // this allows you to link to other pages with relative file paths
            a: createRelativeLink(source, page),
          })}
        />
        <PageFeedback />
      </DocsBody>
    </DocsPage>
  );
}

export async function generateStaticParams() {
  return source.generateParams();
}

export async function generateMetadata(props: {
  params: Promise<{ slug?: string[] }>;
}): Promise<Metadata> {
  const params = await props.params;
  const page = source.getPage(params.slug);
  if (!page) notFound();

  let title = `${page.data.title} | Cua`;
  if (page.url.includes('api')) title = `${page.data.title} | Cua API`;
  if (page.url.includes('guide')) title = ` Guide: ${page.data.title} | Cua`;

  // Canonical URL points to cua.ai to consolidate all SEO authority on main domain
  const canonicalUrl = `https://cua.ai${page.url}`;

  // Extract keywords from the page for SEO
  const keywords = [
    'computer use agent',
    'computer use',
    'AI automation',
    'visual automation',
    page.data.title,
  ];

  // Structured data for better Google indexing (TechArticle schema)
  const structuredData = {
    '@context': 'https://schema.org',
    '@type': 'TechArticle',
    headline: page.data.title,
    description: page.data.description,
    url: canonicalUrl,
    publisher: {
      '@type': 'Organization',
      name: 'Cua',
      url: 'https://cua.ai',
      logo: {
        '@type': 'ImageObject',
        url: 'https://cua.ai/cua_logo_black.svg',
      },
    },
    mainEntityOfPage: {
      '@type': 'WebPage',
      '@id': canonicalUrl,
    },
  };

  // Breadcrumb schema for better site structure understanding
  const breadcrumbSchema = {
    '@context': 'https://schema.org',
    '@type': 'BreadcrumbList',
    itemListElement: [
      {
        '@type': 'ListItem',
        position: 1,
        name: 'Cua',
        item: 'https://cua.ai',
      },
      {
        '@type': 'ListItem',
        position: 2,
        name: 'Documentation',
        item: 'https://cua.ai/docs',
      },
      {
        '@type': 'ListItem',
        position: 3,
        name: page.data.title,
        item: canonicalUrl,
      },
    ],
  };

  return {
    title,
    description: page.data.description,
    keywords,
    authors: [{ name: 'Cua', url: 'https://cua.ai' }],
    robots: {
      index: true,
      follow: true,
      googleBot: {
        index: true,
        follow: true,
        'max-image-preview': 'large',
        'max-snippet': -1,
      },
    },
    alternates: {
      canonical: canonicalUrl,
    },
    openGraph: {
      title,
      description: page.data.description,
      type: 'article',
      siteName: 'Cua',
      url: canonicalUrl,
    },
    twitter: {
      card: 'summary',
      title,
      description: page.data.description,
      creator: '@trycua',
    },
    other: {
      'script:ld+json': JSON.stringify([structuredData, breadcrumbSchema]),
    },
  };
}

```
Page 12/20FirstPrevNextLast