#
tokens: 42340/50000 3/616 files (page 18/20)
lines: off (toggle) GitHub
raw markdown copy
This is page 18 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.

# Directory Structure

```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── bump-version.yml
│       ├── ci-lume.yml
│       ├── docker-publish-cua-linux.yml
│       ├── docker-publish-cua-windows.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── link-check.yml
│       ├── lint.yml
│       ├── npm-publish-cli.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       ├── python-tests.yml
│       ├── test-cua-models.yml
│       └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│   ├── docs.code-workspace
│   ├── extensions.json
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── cloud-windows-ga-macos-preview.md
│   ├── composite-agents.md
│   ├── computer-use-agents-for-growth-hacking.md
│   ├── cua-hackathon.md
│   ├── cua-playground-preview.md
│   ├── cua-vlm-router.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cli.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── neurips-2025-cua-papers.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .env.example
│   ├── .gitignore
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── observability.mdx
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── cua-vlm-router.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   ├── telemetry.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── cli-playbook
│   │       │   ├── commands.mdx
│   │       │   ├── index.mdx
│   │       │   └── meta.json
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── meta.json
│   │       │   ├── sandboxed-python.mdx
│   │       │   └── tracing-api.mdx
│   │       ├── example-usecases
│   │       │   ├── form-filling.mdx
│   │       │   ├── gemini-complex-ui-navigation.mdx
│   │       │   ├── meta.json
│   │       │   ├── post-event-contact-export.mdx
│   │       │   └── windows-app-behind-vpn.mdx
│   │       ├── get-started
│   │       │   ├── meta.json
│   │       │   └── quickstart.mdx
│   │       ├── index.mdx
│   │       ├── macos-vm-cli-playbook
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   └── meta.json
│   │       └── meta.json
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── bg-dark.jpg
│   │       ├── bg-light.jpg
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── grounding-with-gemini3.gif
│   │       ├── hero.png
│   │       ├── laminar_trace_example.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   ├── posthog
│   │   │   │   │   └── [...path]
│   │   │   │   │       └── route.ts
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   ├── llms.txt
│   │   │   │   └── route.ts
│   │   │   ├── robots.ts
│   │   │   └── sitemap.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── analytics-tracker.tsx
│   │   │   ├── cookie-consent.tsx
│   │   │   ├── doc-actions-menu.tsx
│   │   │   ├── editable-code-block.tsx
│   │   │   ├── footer.tsx
│   │   │   ├── hero.tsx
│   │   │   ├── iou.tsx
│   │   │   ├── mermaid.tsx
│   │   │   └── page-feedback.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   ├── mdx-components.tsx
│   │   └── providers
│   │       └── posthog-provider.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── browser_tool_example.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── tracing_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── cua_adapter.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gelato.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── generic_vlm.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   ├── uiins.py
│   │   │   │   │   ├── uitars.py
│   │   │   │   │   └── uitars2.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── tools
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── browser_tool.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer_agent.py
│   │   ├── bench-ui
│   │   │   ├── bench_ui
│   │   │   │   ├── __init__.py
│   │   │   │   ├── api.py
│   │   │   │   └── child.py
│   │   │   ├── examples
│   │   │   │   ├── folder_example.py
│   │   │   │   ├── gui
│   │   │   │   │   ├── index.html
│   │   │   │   │   ├── logo.svg
│   │   │   │   │   └── styles.css
│   │   │   │   ├── output_overlay.png
│   │   │   │   └── simple_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       └── test_port_detection.py
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── tracing_wrapper.py
│   │   │   │   ├── tracing.py
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_computer.py
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── browser.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   ├── utils
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   └── wallpaper.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   ├── test_connection.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_server.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_telemetry.py
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── build-extension.py
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── desktop-extension
│   │   │   │   ├── cua-extension.mcpb
│   │   │   │   ├── desktop_extension.png
│   │   │   │   ├── manifest.json
│   │   │   │   ├── README.md
│   │   │   │   ├── requirements.txt
│   │   │   │   ├── run_server.sh
│   │   │   │   └── setup.py
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── QUICK_TEST_COMMANDS.sh
│   │   │   ├── quick_test_local_option.py
│   │   │   ├── README.md
│   │   │   ├── scripts
│   │   │   │   ├── install_mcp_server.sh
│   │   │   │   └── start_mcp_server.sh
│   │   │   ├── test_mcp_server_local_option.py
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_mcp_server.py
│   │   ├── pylume
│   │   │   └── tests
│   │   │       ├── conftest.py
│   │   │       └── test_pylume.py
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           ├── conftest.py
│   │           └── test_omniparser.py
│   ├── qemu-docker
│   │   ├── linux
│   │   │   ├── Dockerfile
│   │   │   ├── README.md
│   │   │   └── src
│   │   │       ├── entry.sh
│   │   │       └── vm
│   │   │           ├── image
│   │   │           │   └── README.md
│   │   │           └── setup
│   │   │               ├── install.sh
│   │   │               ├── setup-cua-server.sh
│   │   │               └── setup.sh
│   │   ├── README.md
│   │   └── windows
│   │       ├── Dockerfile
│   │       ├── README.md
│   │       └── src
│   │           ├── entry.sh
│   │           └── vm
│   │               ├── image
│   │               │   └── README.md
│   │               └── setup
│   │                   ├── install.bat
│   │                   ├── on-logon.ps1
│   │                   ├── setup-cua-server.ps1
│   │                   ├── setup-utils.psm1
│   │                   └── setup.ps1
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── cua-cli
│   │   │   ├── .gitignore
│   │   │   ├── .prettierrc
│   │   │   ├── bun.lock
│   │   │   ├── CLAUDE.md
│   │   │   ├── index.ts
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── auth.ts
│   │   │   │   ├── cli.ts
│   │   │   │   ├── commands
│   │   │   │   │   ├── auth.ts
│   │   │   │   │   └── sandbox.ts
│   │   │   │   ├── config.ts
│   │   │   │   ├── http.ts
│   │   │   │   ├── storage.ts
│   │   │   │   └── util.ts
│   │   │   └── tsconfig.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Development.md
│       ├── Dockerfile
│       ├── Dockerfile.dev
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── install-cli.ps1
│   ├── install-cli.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   ├── run-docker-dev.sh
│   └── typescript-typecheck.js
├── TESTING.md
├── tests
│   ├── agent_loop_testing
│   │   ├── agent_test.py
│   │   └── README.md
│   ├── pytest.ini
│   ├── shell_cmd.py
│   ├── test_files.py
│   ├── test_mcp_server_session_management.py
│   ├── test_mcp_server_streaming.py
│   ├── test_shell_bash.py
│   ├── test_telemetry.py
│   ├── test_tracing.py
│   ├── test_venv.py
│   └── test_watchdog.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/libs/python/computer/computer/interface/generic.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import time
from typing import Any, Dict, List, Optional, Tuple

import aiohttp
import websockets
from PIL import Image

from ..logger import Logger, LogLevel
from ..utils import (
    bytes_to_image,
    decode_base64_image,
    draw_box,
    encode_base64_image,
    resize_image,
)
from .base import BaseComputerInterface
from .models import CommandResult, Key, KeyType, MouseButton


class GenericComputerInterface(BaseComputerInterface):
    """Generic interface with common functionality for all supported platforms (Windows, Linux, macOS)."""

    def __init__(
        self,
        ip_address: str,
        username: str = "lume",
        password: str = "lume",
        api_key: Optional[str] = None,
        vm_name: Optional[str] = None,
        logger_name: str = "computer.interface.generic",
        api_port: Optional[int] = None,
    ):
        super().__init__(ip_address, username, password, api_key, vm_name)
        self._ws = None
        self._reconnect_task = None
        self._closed = False
        self._last_ping = 0
        self._ping_interval = 5  # Send ping every 5 seconds
        self._ping_timeout = 120  # Wait 120 seconds for pong response
        self._reconnect_delay = 1  # Start with 1 second delay
        self._max_reconnect_delay = 30  # Maximum delay between reconnection attempts
        self._log_connection_attempts = True  # Flag to control connection attempt logging
        self._authenticated = False  # Track authentication status
        self._recv_lock = asyncio.Lock()  # Lock to ensure only one recv at a time

        # Set logger name for the interface
        self.logger = Logger(logger_name, LogLevel.NORMAL)

        # Store custom ports
        self._api_port = api_port

        # Optional default delay time between commands (in seconds)
        self.delay = 0.0

    async def _handle_delay(self, delay: Optional[float] = None):
        """Handle delay between commands using async sleep.

        Args:
            delay: Optional delay in seconds. If None, uses self.delay.
        """
        if delay is not None:
            if isinstance(delay, float) or isinstance(delay, int) and delay > 0:
                await asyncio.sleep(delay)
        elif isinstance(self.delay, float) or isinstance(self.delay, int) and self.delay > 0:
            await asyncio.sleep(self.delay)

    @property
    def ws_uri(self) -> str:
        """Get the WebSocket URI using the current IP address.

        Returns:
            WebSocket URI for the Computer API Server
        """
        protocol = "wss" if self.api_key else "ws"
        # Use custom API port if provided, otherwise use defaults based on API key
        port = (
            str(self._api_port)
            if self._api_port is not None
            else ("8443" if self.api_key else "8000")
        )
        return f"{protocol}://{self.ip_address}:{port}/ws"

    @property
    def rest_uri(self) -> str:
        """Get the REST URI using the current IP address.

        Returns:
            REST URI for the Computer API Server
        """
        protocol = "https" if self.api_key else "http"
        # Use custom API port if provided, otherwise use defaults based on API key
        port = (
            str(self._api_port)
            if self._api_port is not None
            else ("8443" if self.api_key else "8000")
        )
        return f"{protocol}://{self.ip_address}:{port}/cmd"

    # Mouse actions
    async def mouse_down(
        self,
        x: Optional[int] = None,
        y: Optional[int] = None,
        button: str = "left",
        delay: Optional[float] = None,
    ) -> None:
        await self._send_command("mouse_down", {"x": x, "y": y, "button": button})
        await self._handle_delay(delay)

    async def mouse_up(
        self,
        x: Optional[int] = None,
        y: Optional[int] = None,
        button: str = "left",
        delay: Optional[float] = None,
    ) -> None:
        await self._send_command("mouse_up", {"x": x, "y": y, "button": button})
        await self._handle_delay(delay)

    async def left_click(
        self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
    ) -> None:
        await self._send_command("left_click", {"x": x, "y": y})
        await self._handle_delay(delay)

    async def right_click(
        self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
    ) -> None:
        await self._send_command("right_click", {"x": x, "y": y})
        await self._handle_delay(delay)

    async def double_click(
        self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
    ) -> None:
        await self._send_command("double_click", {"x": x, "y": y})
        await self._handle_delay(delay)

    async def move_cursor(self, x: int, y: int, delay: Optional[float] = None) -> None:
        await self._send_command("move_cursor", {"x": x, "y": y})
        await self._handle_delay(delay)

    async def drag_to(
        self,
        x: int,
        y: int,
        button: "MouseButton" = "left",
        duration: float = 0.5,
        delay: Optional[float] = None,
    ) -> None:
        await self._send_command(
            "drag_to", {"x": x, "y": y, "button": button, "duration": duration}
        )
        await self._handle_delay(delay)

    async def drag(
        self,
        path: List[Tuple[int, int]],
        button: "MouseButton" = "left",
        duration: float = 0.5,
        delay: Optional[float] = None,
    ) -> None:
        await self._send_command("drag", {"path": path, "button": button, "duration": duration})
        await self._handle_delay(delay)

    # Keyboard Actions
    async def key_down(self, key: "KeyType", delay: Optional[float] = None) -> None:
        await self._send_command("key_down", {"key": key})
        await self._handle_delay(delay)

    async def key_up(self, key: "KeyType", delay: Optional[float] = None) -> None:
        await self._send_command("key_up", {"key": key})
        await self._handle_delay(delay)

    async def type_text(self, text: str, delay: Optional[float] = None) -> None:
        await self._send_command("type_text", {"text": text})
        await self._handle_delay(delay)

    async def press(self, key: "KeyType", delay: Optional[float] = None) -> None:
        """Press a single key.

        Args:
            key: The key to press. Can be any of:
                - A Key enum value (recommended), e.g. Key.PAGE_DOWN
                - A direct key value string, e.g. 'pagedown'
                - A single character string, e.g. 'a'

        Examples:
            ```python
            # Using enum (recommended)
            await interface.press(Key.PAGE_DOWN)
            await interface.press(Key.ENTER)

            # Using direct values
            await interface.press('pagedown')
            await interface.press('enter')

            # Using single characters
            await interface.press('a')
            ```

        Raises:
            ValueError: If the key type is invalid or the key is not recognized
        """
        if isinstance(key, Key):
            actual_key = key.value
        elif isinstance(key, str):
            # Try to convert to enum if it matches a known key
            key_or_enum = Key.from_string(key)
            actual_key = key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum
        else:
            raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")

        await self._send_command("press_key", {"key": actual_key})
        await self._handle_delay(delay)

    async def press_key(self, key: "KeyType", delay: Optional[float] = None) -> None:
        """DEPRECATED: Use press() instead.

        This method is kept for backward compatibility but will be removed in a future version.
        Please use the press() method instead.
        """
        await self.press(key, delay)

    async def hotkey(self, *keys: "KeyType", delay: Optional[float] = None) -> None:
        """Press multiple keys simultaneously.

        Args:
            *keys: Multiple keys to press simultaneously. Each key can be any of:
                - A Key enum value (recommended), e.g. Key.COMMAND
                - A direct key value string, e.g. 'command'
                - A single character string, e.g. 'a'

        Examples:
            ```python
            # Using enums (recommended)
            await interface.hotkey(Key.COMMAND, Key.C)  # Copy
            await interface.hotkey(Key.COMMAND, Key.V)  # Paste

            # Using mixed formats
            await interface.hotkey(Key.COMMAND, 'a')  # Select all
            ```

        Raises:
            ValueError: If any key type is invalid or not recognized
        """
        actual_keys = []
        for key in keys:
            if isinstance(key, Key):
                actual_keys.append(key.value)
            elif isinstance(key, str):
                # Try to convert to enum if it matches a known key
                key_or_enum = Key.from_string(key)
                actual_keys.append(
                    key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum
                )
            else:
                raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")

        await self._send_command("hotkey", {"keys": actual_keys})
        await self._handle_delay(delay)

    # Scrolling Actions
    async def scroll(self, x: int, y: int, delay: Optional[float] = None) -> None:
        await self._send_command("scroll", {"x": x, "y": y})
        await self._handle_delay(delay)

    async def scroll_down(self, clicks: int = 1, delay: Optional[float] = None) -> None:
        await self._send_command("scroll_down", {"clicks": clicks})
        await self._handle_delay(delay)

    async def scroll_up(self, clicks: int = 1, delay: Optional[float] = None) -> None:
        await self._send_command("scroll_up", {"clicks": clicks})
        await self._handle_delay(delay)

    # Screen actions
    async def screenshot(
        self,
        boxes: Optional[List[Tuple[int, int, int, int]]] = None,
        box_color: str = "#FF0000",
        box_thickness: int = 2,
        scale_factor: float = 1.0,
    ) -> bytes:
        """Take a screenshot with optional box drawing and scaling.

        Args:
            boxes: Optional list of (x, y, width, height) tuples defining boxes to draw in screen coordinates
            box_color: Color of the boxes in hex format (default: "#FF0000" red)
            box_thickness: Thickness of the box borders in pixels (default: 2)
            scale_factor: Factor to scale the final image by (default: 1.0)
                         Use > 1.0 to enlarge, < 1.0 to shrink (e.g., 0.5 for half size, 2.0 for double)

        Returns:
            bytes: The screenshot image data, optionally with boxes drawn on it and scaled
        """
        result = await self._send_command("screenshot")
        if not result.get("image_data"):
            raise RuntimeError("Failed to take screenshot, no image data received from server")

        screenshot = decode_base64_image(result["image_data"])

        if boxes:
            # Get the natural scaling between screen and screenshot
            screen_size = await self.get_screen_size()
            screenshot_width, screenshot_height = bytes_to_image(screenshot).size
            width_scale = screenshot_width / screen_size["width"]
            height_scale = screenshot_height / screen_size["height"]

            # Scale box coordinates from screen space to screenshot space
            for box in boxes:
                scaled_box = (
                    int(box[0] * width_scale),  # x
                    int(box[1] * height_scale),  # y
                    int(box[2] * width_scale),  # width
                    int(box[3] * height_scale),  # height
                )
                screenshot = draw_box(
                    screenshot,
                    x=scaled_box[0],
                    y=scaled_box[1],
                    width=scaled_box[2],
                    height=scaled_box[3],
                    color=box_color,
                    thickness=box_thickness,
                )

        if scale_factor != 1.0:
            screenshot = resize_image(screenshot, scale_factor)

        return screenshot

    async def get_screen_size(self) -> Dict[str, int]:
        result = await self._send_command("get_screen_size")
        if result["success"] and result["size"]:
            return result["size"]
        raise RuntimeError("Failed to get screen size")

    async def get_cursor_position(self) -> Dict[str, int]:
        result = await self._send_command("get_cursor_position")
        if result["success"] and result["position"]:
            return result["position"]
        raise RuntimeError("Failed to get cursor position")

    # Clipboard Actions
    async def copy_to_clipboard(self) -> str:
        result = await self._send_command("copy_to_clipboard")
        if result["success"] and result["content"]:
            return result["content"]
        raise RuntimeError("Failed to get clipboard content")

    async def set_clipboard(self, text: str) -> None:
        await self._send_command("set_clipboard", {"text": text})

    # File Operations
    async def _write_bytes_chunked(
        self, path: str, content: bytes, append: bool = False, chunk_size: int = 1024 * 1024
    ) -> None:
        """Write large files in chunks to avoid memory issues."""
        total_size = len(content)
        current_offset = 0

        while current_offset < total_size:
            chunk_end = min(current_offset + chunk_size, total_size)
            chunk_data = content[current_offset:chunk_end]

            # First chunk uses the original append flag, subsequent chunks always append
            chunk_append = append if current_offset == 0 else True

            result = await self._send_command(
                "write_bytes",
                {
                    "path": path,
                    "content_b64": encode_base64_image(chunk_data),
                    "append": chunk_append,
                },
            )

            if not result.get("success", False):
                raise RuntimeError(result.get("error", "Failed to write file chunk"))

            current_offset = chunk_end

    async def write_bytes(self, path: str, content: bytes, append: bool = False) -> None:
        # For large files, use chunked writing
        if len(content) > 5 * 1024 * 1024:  # 5MB threshold
            await self._write_bytes_chunked(path, content, append)
            return

        result = await self._send_command(
            "write_bytes",
            {"path": path, "content_b64": encode_base64_image(content), "append": append},
        )
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to write file"))

    async def _read_bytes_chunked(
        self, path: str, offset: int, total_length: int, chunk_size: int = 1024 * 1024
    ) -> bytes:
        """Read large files in chunks to avoid memory issues."""
        chunks = []
        current_offset = offset
        remaining = total_length

        while remaining > 0:
            read_size = min(chunk_size, remaining)
            result = await self._send_command(
                "read_bytes", {"path": path, "offset": current_offset, "length": read_size}
            )

            if not result.get("success", False):
                raise RuntimeError(result.get("error", "Failed to read file chunk"))

            content_b64 = result.get("content_b64", "")
            chunk_data = decode_base64_image(content_b64)
            chunks.append(chunk_data)

            current_offset += read_size
            remaining -= read_size

        return b"".join(chunks)

    async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> bytes:
        # For large files, use chunked reading
        if length is None:
            # Get file size first to determine if we need chunking
            file_size = await self.get_file_size(path)
            # If file is larger than 5MB, read in chunks
            if file_size > 5 * 1024 * 1024:  # 5MB threshold
                return await self._read_bytes_chunked(
                    path, offset, file_size - offset if offset > 0 else file_size
                )

        result = await self._send_command(
            "read_bytes", {"path": path, "offset": offset, "length": length}
        )
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to read file"))
        content_b64 = result.get("content_b64", "")
        return decode_base64_image(content_b64)

    async def read_text(self, path: str, encoding: str = "utf-8") -> str:
        """Read text from a file with specified encoding.

        Args:
            path: Path to the file to read
            encoding: Text encoding to use (default: 'utf-8')

        Returns:
            str: The decoded text content of the file
        """
        content_bytes = await self.read_bytes(path)
        return content_bytes.decode(encoding)

    async def write_text(
        self, path: str, content: str, encoding: str = "utf-8", append: bool = False
    ) -> None:
        """Write text to a file with specified encoding.

        Args:
            path: Path to the file to write
            content: Text content to write
            encoding: Text encoding to use (default: 'utf-8')
            append: Whether to append to the file instead of overwriting
        """
        content_bytes = content.encode(encoding)
        await self.write_bytes(path, content_bytes, append)

    async def get_file_size(self, path: str) -> int:
        result = await self._send_command("get_file_size", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get file size"))
        return result.get("size", 0)

    async def file_exists(self, path: str) -> bool:
        result = await self._send_command("file_exists", {"path": path})
        return result.get("exists", False)

    async def directory_exists(self, path: str) -> bool:
        result = await self._send_command("directory_exists", {"path": path})
        return result.get("exists", False)

    async def create_dir(self, path: str) -> None:
        result = await self._send_command("create_dir", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to create directory"))

    async def delete_file(self, path: str) -> None:
        result = await self._send_command("delete_file", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to delete file"))

    async def delete_dir(self, path: str) -> None:
        result = await self._send_command("delete_dir", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to delete directory"))

    async def list_dir(self, path: str) -> list[str]:
        result = await self._send_command("list_dir", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to list directory"))
        return result.get("files", [])

    # Desktop actions
    async def get_desktop_environment(self) -> str:
        result = await self._send_command("get_desktop_environment")
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get desktop environment"))
        return result.get("environment", "unknown")

    async def set_wallpaper(self, path: str) -> None:
        result = await self._send_command("set_wallpaper", {"path": path})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to set wallpaper"))

    # Window management
    async def open(self, target: str) -> None:
        result = await self._send_command("open", {"target": target})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to open target"))

    async def launch(self, app: str, args: list[str] | None = None) -> int | None:
        payload: dict[str, object] = {"app": app}
        if args is not None:
            payload["args"] = args
        result = await self._send_command("launch", payload)
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to launch application"))
        return result.get("pid")  # type: ignore[return-value]

    async def get_current_window_id(self) -> int | str:
        result = await self._send_command("get_current_window_id")
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get current window id"))
        return result["window_id"]  # type: ignore[return-value]

    async def get_application_windows(self, app: str) -> list[int | str]:
        result = await self._send_command("get_application_windows", {"app": app})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get application windows"))
        return list(result.get("windows", []))  # type: ignore[return-value]

    async def get_window_name(self, window_id: int | str) -> str:
        result = await self._send_command("get_window_name", {"window_id": window_id})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get window name"))
        return result.get("name", "")  # type: ignore[return-value]

    async def get_window_size(self, window_id: int | str) -> tuple[int, int]:
        result = await self._send_command("get_window_size", {"window_id": window_id})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get window size"))
        return int(result.get("width", 0)), int(result.get("height", 0))

    async def get_window_position(self, window_id: int | str) -> tuple[int, int]:
        result = await self._send_command("get_window_position", {"window_id": window_id})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get window position"))
        return int(result.get("x", 0)), int(result.get("y", 0))

    async def set_window_size(self, window_id: int | str, width: int, height: int) -> None:
        result = await self._send_command(
            "set_window_size", {"window_id": window_id, "width": width, "height": height}
        )
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to set window size"))

    async def set_window_position(self, window_id: int | str, x: int, y: int) -> None:
        result = await self._send_command(
            "set_window_position", {"window_id": window_id, "x": x, "y": y}
        )
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to set window position"))

    async def maximize_window(self, window_id: int | str) -> None:
        result = await self._send_command("maximize_window", {"window_id": window_id})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to maximize window"))

    async def minimize_window(self, window_id: int | str) -> None:
        result = await self._send_command("minimize_window", {"window_id": window_id})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to minimize window"))

    async def activate_window(self, window_id: int | str) -> None:
        result = await self._send_command("activate_window", {"window_id": window_id})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to activate window"))

    async def close_window(self, window_id: int | str) -> None:
        result = await self._send_command("close_window", {"window_id": window_id})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to close window"))

    # Convenience aliases
    async def get_window_title(self, window_id: int | str) -> str:
        return await self.get_window_name(window_id)

    async def window_size(self, window_id: int | str) -> tuple[int, int]:
        return await self.get_window_size(window_id)

    # Command execution
    async def run_command(self, command: str) -> CommandResult:
        result = await self._send_command("run_command", {"command": command})
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to run command"))
        return CommandResult(
            stdout=result.get("stdout", ""),
            stderr=result.get("stderr", ""),
            returncode=result.get("return_code", 0),
        )

    # Accessibility Actions
    async def get_accessibility_tree(self) -> Dict[str, Any]:
        """Get the accessibility tree of the current screen."""
        result = await self._send_command("get_accessibility_tree")
        if not result.get("success", False):
            raise RuntimeError(result.get("error", "Failed to get accessibility tree"))
        return result

    async def get_active_window_bounds(self) -> Dict[str, int]:
        """Get the bounds of the currently active window."""
        result = await self._send_command("get_active_window_bounds")
        if result["success"] and result["bounds"]:
            return result["bounds"]
        raise RuntimeError("Failed to get active window bounds")

    async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert screenshot coordinates to screen coordinates.

        Args:
            x: X coordinate in screenshot space
            y: Y coordinate in screenshot space

        Returns:
            tuple[float, float]: (x, y) coordinates in screen space
        """
        screen_size = await self.get_screen_size()
        screenshot = await self.screenshot()
        screenshot_img = bytes_to_image(screenshot)
        screenshot_width, screenshot_height = screenshot_img.size

        # Calculate scaling factors
        width_scale = screen_size["width"] / screenshot_width
        height_scale = screen_size["height"] / screenshot_height

        # Convert coordinates
        screen_x = x * width_scale
        screen_y = y * height_scale

        return screen_x, screen_y

    async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert screen coordinates to screenshot coordinates.

        Args:
            x: X coordinate in screen space
            y: Y coordinate in screen space

        Returns:
            tuple[float, float]: (x, y) coordinates in screenshot space
        """
        screen_size = await self.get_screen_size()
        screenshot = await self.screenshot()
        screenshot_img = bytes_to_image(screenshot)
        screenshot_width, screenshot_height = screenshot_img.size

        # Calculate scaling factors
        width_scale = screenshot_width / screen_size["width"]
        height_scale = screenshot_height / screen_size["height"]

        # Convert coordinates
        screenshot_x = x * width_scale
        screenshot_y = y * height_scale

        return screenshot_x, screenshot_y

    # Playwright browser control
    async def playwright_exec(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """
        Execute a Playwright browser command.

        Args:
            command: The browser command to execute (visit_url, click, type, scroll, web_search)
            params: Command parameters

        Returns:
            Dict containing the command result

        Examples:
            # Navigate to a URL
            await interface.playwright_exec("visit_url", {"url": "https://example.com"})

            # Click at coordinates
            await interface.playwright_exec("click", {"x": 100, "y": 200})

            # Type text
            await interface.playwright_exec("type", {"text": "Hello, world!"})

            # Scroll
            await interface.playwright_exec("scroll", {"delta_x": 0, "delta_y": -100})

            # Web search
            await interface.playwright_exec("web_search", {"query": "computer use agent"})
        """
        protocol = "https" if self.api_key else "http"
        port = "8443" if self.api_key else "8000"
        url = f"{protocol}://{self.ip_address}:{port}/playwright_exec"

        payload = {"command": command, "params": params or {}}
        headers = {"Content-Type": "application/json"}
        if self.api_key:
            headers["X-API-Key"] = self.api_key
        if self.vm_name:
            headers["X-Container-Name"] = self.vm_name

        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(url, json=payload, headers=headers) as response:
                    if response.status == 200:
                        return await response.json()
                    else:
                        error_text = await response.text()
                        return {"success": False, "error": error_text}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Websocket Methods
    async def _keep_alive(self):
        """Keep the WebSocket connection alive with automatic reconnection."""
        retry_count = 0
        max_log_attempts = 1  # Only log the first attempt at INFO level
        log_interval = 500  # Then log every 500th attempt (significantly increased from 30)
        last_warning_time = 0
        min_warning_interval = 30  # Minimum seconds between connection lost warnings
        min_retry_delay = 0.5  # Minimum delay between connection attempts (500ms)

        while not self._closed:
            try:
                if self._ws is None or (
                    self._ws and self._ws.state == websockets.protocol.State.CLOSED
                ):
                    try:
                        retry_count += 1

                        # Add a minimum delay between connection attempts to avoid flooding
                        if retry_count > 1:
                            await asyncio.sleep(min_retry_delay)

                        # Only log the first attempt at INFO level, then every Nth attempt
                        if retry_count == 1:
                            self.logger.info(f"Attempting WebSocket connection to {self.ws_uri}")
                        elif retry_count % log_interval == 0:
                            self.logger.info(
                                f"Still attempting WebSocket connection (attempt {retry_count})..."
                            )
                        else:
                            # All other attempts are logged at DEBUG level
                            self.logger.debug(
                                f"Attempting WebSocket connection to {self.ws_uri} (attempt {retry_count})"
                            )

                        self._ws = await asyncio.wait_for(
                            websockets.connect(
                                self.ws_uri,
                                max_size=1024 * 1024 * 10,  # 10MB limit
                                max_queue=32,
                                ping_interval=self._ping_interval,
                                ping_timeout=self._ping_timeout,
                                close_timeout=5,
                                compression=None,  # Disable compression to reduce overhead
                            ),
                            timeout=120,
                        )
                        self.logger.info("WebSocket connection established")

                        # If api_key and vm_name are provided, perform authentication handshake
                        if self.api_key and self.vm_name:
                            self.logger.info("Performing authentication handshake...")
                            auth_message = {
                                "command": "authenticate",
                                "params": {"api_key": self.api_key, "container_name": self.vm_name},
                            }
                            await self._ws.send(json.dumps(auth_message))

                            # Wait for authentication response
                            async with self._recv_lock:
                                auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10)
                            auth_result = json.loads(auth_response)

                            if not auth_result.get("success"):
                                error_msg = auth_result.get("error", "Authentication failed")
                                self.logger.error(f"Authentication failed: {error_msg}")
                                await self._ws.close()
                                self._ws = None
                                raise ConnectionError(f"Authentication failed: {error_msg}")

                            self.logger.info("Authentication successful")

                        self._reconnect_delay = 1  # Reset reconnect delay on successful connection
                        self._last_ping = time.time()
                        retry_count = 0  # Reset retry count on successful connection
                    except (asyncio.TimeoutError, websockets.exceptions.WebSocketException) as e:
                        next_retry = self._reconnect_delay

                        # Only log the first error at WARNING level, then every Nth attempt
                        if retry_count == 1:
                            self.logger.warning(
                                "Computer API Server not ready yet. Will retry automatically."
                            )
                        elif retry_count % log_interval == 0:
                            self.logger.warning(
                                f"Still waiting for Computer API Server (attempt {retry_count})..."
                            )
                        else:
                            # All other errors are logged at DEBUG level
                            self.logger.debug(f"Connection attempt {retry_count} failed: {e}")

                        if self._ws:
                            try:
                                await self._ws.close()
                            except:
                                pass
                        self._ws = None

                        # Use exponential backoff for connection retries
                        await asyncio.sleep(self._reconnect_delay)
                        self._reconnect_delay = min(
                            self._reconnect_delay * 2, self._max_reconnect_delay
                        )
                        continue

                # Regular ping to check connection
                if self._ws and self._ws.state == websockets.protocol.State.OPEN:
                    try:
                        if time.time() - self._last_ping >= self._ping_interval:
                            pong_waiter = await self._ws.ping()
                            await asyncio.wait_for(pong_waiter, timeout=self._ping_timeout)
                            self._last_ping = time.time()
                    except Exception as e:
                        self.logger.debug(f"Ping failed: {e}")
                        if self._ws:
                            try:
                                await self._ws.close()
                            except:
                                pass
                        self._ws = None
                        continue

                await asyncio.sleep(1)

            except Exception as e:
                current_time = time.time()
                # Only log connection lost warnings at most once every min_warning_interval seconds
                if current_time - last_warning_time >= min_warning_interval:
                    self.logger.warning(
                        "Computer API Server connection lost. Will retry automatically."
                    )
                    last_warning_time = current_time
                else:
                    # Log at debug level instead
                    self.logger.debug(f"Connection lost: {e}")

                if self._ws:
                    try:
                        await self._ws.close()
                    except:
                        pass
                self._ws = None

    async def _ensure_connection(self):
        """Ensure WebSocket connection is established."""
        if self._reconnect_task is None or self._reconnect_task.done():
            self._reconnect_task = asyncio.create_task(self._keep_alive())

        retry_count = 0
        max_retries = 5

        while retry_count < max_retries:
            try:
                if self._ws and self._ws.state == websockets.protocol.State.OPEN:
                    return
                retry_count += 1
                await asyncio.sleep(1)
            except Exception as e:
                # Only log at ERROR level for the last retry attempt
                if retry_count == max_retries - 1:
                    self.logger.error(
                        f"Persistent connection check error after {retry_count} attempts: {e}"
                    )
                else:
                    self.logger.debug(f"Connection check error (attempt {retry_count}): {e}")
                retry_count += 1
                await asyncio.sleep(1)
                continue

        raise ConnectionError("Failed to establish WebSocket connection after multiple retries")

    async def _send_command_ws(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """Send command through WebSocket."""
        max_retries = 3
        retry_count = 0
        last_error = None

        # Acquire lock to ensure only one command is processed at a time
        self.logger.debug(f"Acquired lock for command: {command}")
        while retry_count < max_retries:
            try:
                await self._ensure_connection()
                if not self._ws:
                    raise ConnectionError("WebSocket connection is not established")

                message = {"command": command, "params": params or {}}
                await self._ws.send(json.dumps(message))
                async with self._recv_lock:
                    response = await asyncio.wait_for(self._ws.recv(), timeout=120)
                self.logger.debug(f"Completed command: {command}")
                return json.loads(response)
            except Exception as e:
                last_error = e
                retry_count += 1
                if retry_count < max_retries:
                    # Only log at debug level for intermediate retries
                    self.logger.debug(
                        f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
                    )
                    await asyncio.sleep(1)
                    continue
                else:
                    # Only log at error level for the final failure
                    self.logger.error(
                        f"Failed to send command '{command}' after {max_retries} retries"
                    )
                    self.logger.debug(f"Command failure details: {e}")
                    raise

        raise last_error if last_error else RuntimeError("Failed to send command")

    async def _send_command_rest(
        self, command: str, params: Optional[Dict] = None
    ) -> Dict[str, Any]:
        """Send command through REST API without retries or connection management."""
        try:
            # Prepare the request payload
            payload = {"command": command, "params": params or {}}

            # Prepare headers
            headers = {"Content-Type": "application/json"}
            if self.api_key:
                headers["X-API-Key"] = self.api_key
            if self.vm_name:
                headers["X-Container-Name"] = self.vm_name

            # Send the request
            async with aiohttp.ClientSession() as session:
                async with session.post(self.rest_uri, json=payload, headers=headers) as response:
                    # Get the response text
                    response_text = await response.text()

                    # Trim whitespace
                    response_text = response_text.strip()

                    # Check if it starts with "data: "
                    if response_text.startswith("data: "):
                        # Extract everything after "data: "
                        json_str = response_text[6:]  # Remove "data: " prefix
                        try:
                            return json.loads(json_str)
                        except json.JSONDecodeError:
                            return {
                                "success": False,
                                "error": "Server returned malformed response",
                                "message": response_text,
                            }
                    else:
                        # Return error response
                        return {
                            "success": False,
                            "error": "Server returned malformed response",
                            "message": response_text,
                        }

        except Exception as e:
            return {"success": False, "error": "Request failed", "message": str(e)}

    async def _send_command(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
        """Send command using REST API with WebSocket fallback."""
        # Try REST API first
        result = await self._send_command_rest(command, params)

        # If REST failed with "Request failed", try WebSocket as fallback
        if not result.get("success", True) and (
            result.get("error") == "Request failed"
            or result.get("error") == "Server returned malformed response"
        ):
            self.logger.warning(
                f"REST API failed for command '{command}', trying WebSocket fallback"
            )
            try:
                return await self._send_command_ws(command, params)
            except Exception as e:
                self.logger.error(f"WebSocket fallback also failed: {e}")
                # Return the original REST error
                return result

        return result

    async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0):
        """Wait for Computer API Server to be ready by testing version command."""

        # Check if REST API is available
        try:
            result = await self._send_command_rest("version", {})
            assert result.get("success", True)
        except Exception as e:
            self.logger.debug(
                f"REST API failed for command 'version', trying WebSocket fallback: {e}"
            )
            try:
                await self._wait_for_ready_ws(timeout, interval)
                return
            except Exception as e:
                self.logger.debug(f"WebSocket fallback also failed: {e}")
                raise e

        start_time = time.time()
        last_error = None
        attempt_count = 0
        progress_interval = 10  # Log progress every 10 seconds
        last_progress_time = start_time

        try:
            self.logger.info(
                f"Waiting for Computer API Server to be ready (timeout: {timeout}s)..."
            )

            # Wait for the server to respond to get_screen_size command
            while time.time() - start_time < timeout:
                try:
                    attempt_count += 1
                    current_time = time.time()

                    # Log progress periodically without flooding logs
                    if current_time - last_progress_time >= progress_interval:
                        elapsed = current_time - start_time
                        self.logger.info(
                            f"Still waiting for Computer API Server... (elapsed: {elapsed:.1f}s, attempts: {attempt_count})"
                        )
                        last_progress_time = current_time

                    # Test the server with a simple get_screen_size command
                    result = await self._send_command("get_screen_size")
                    if result.get("success", False):
                        elapsed = time.time() - start_time
                        self.logger.info(
                            f"Computer API Server is ready (after {elapsed:.1f}s, {attempt_count} attempts)"
                        )
                        return  # Server is ready
                    else:
                        last_error = result.get("error", "Unknown error")
                        self.logger.debug(f"Initial connection command failed: {last_error}")

                except Exception as e:
                    last_error = e
                    self.logger.debug(f"Connection attempt {attempt_count} failed: {e}")

                # Wait before trying again
                await asyncio.sleep(interval)

            # If we get here, we've timed out
            error_msg = f"Could not connect to {self.ip_address} after {timeout} seconds"
            if last_error:
                error_msg += f": {str(last_error)}"
            self.logger.error(error_msg)
            raise TimeoutError(error_msg)

        except Exception as e:
            if isinstance(e, TimeoutError):
                raise
            error_msg = f"Error while waiting for server: {str(e)}"
            self.logger.error(error_msg)
            raise RuntimeError(error_msg)

    async def _wait_for_ready_ws(self, timeout: int = 60, interval: float = 1.0):
        """Wait for WebSocket connection to become available."""
        start_time = time.time()
        last_error = None
        attempt_count = 0
        progress_interval = 10  # Log progress every 10 seconds
        last_progress_time = start_time

        # Disable detailed logging for connection attempts
        self._log_connection_attempts = False

        try:
            self.logger.info(
                f"Waiting for Computer API Server to be ready (timeout: {timeout}s)..."
            )

            # Start the keep-alive task if it's not already running
            if self._reconnect_task is None or self._reconnect_task.done():
                self._reconnect_task = asyncio.create_task(self._keep_alive())

            # Wait for the connection to be established
            while time.time() - start_time < timeout:
                try:
                    attempt_count += 1
                    current_time = time.time()

                    # Log progress periodically without flooding logs
                    if current_time - last_progress_time >= progress_interval:
                        elapsed = current_time - start_time
                        self.logger.info(
                            f"Still waiting for Computer API Server... (elapsed: {elapsed:.1f}s, attempts: {attempt_count})"
                        )
                        last_progress_time = current_time

                    # Check if we have a connection
                    if self._ws and self._ws.state == websockets.protocol.State.OPEN:
                        # Test the connection with a simple command
                        try:
                            await self._send_command_ws("get_screen_size")
                            elapsed = time.time() - start_time
                            self.logger.info(
                                f"Computer API Server is ready (after {elapsed:.1f}s, {attempt_count} attempts)"
                            )
                            return  # Connection is fully working
                        except Exception as e:
                            last_error = e
                            self.logger.debug(f"Connection test failed: {e}")

                    # Wait before trying again
                    await asyncio.sleep(interval)

                except Exception as e:
                    last_error = e
                    self.logger.debug(f"Connection attempt {attempt_count} failed: {e}")
                    await asyncio.sleep(interval)

            # If we get here, we've timed out
            error_msg = f"Could not connect to {self.ip_address} after {timeout} seconds"
            if last_error:
                error_msg += f": {str(last_error)}"
            self.logger.error(error_msg)
            raise TimeoutError(error_msg)
        finally:
            # Reset to default logging behavior
            self._log_connection_attempts = False

    def close(self):
        """Close WebSocket connection.

        Note: In host computer server mode, we leave the connection open
        to allow other clients to connect to the same server. The server
        will handle cleaning up idle connections.
        """
        # Only cancel the reconnect task
        if self._reconnect_task:
            self._reconnect_task.cancel()

        # Don't set closed flag or close websocket by default
        # This allows the server to stay connected for other clients
        # self._closed = True
        # if self._ws:
        #     asyncio.create_task(self._ws.close())
        #     self._ws = None

    def force_close(self):
        """Force close the WebSocket connection.

        This method should be called when you want to completely
        shut down the connection, not just for regular cleanup.
        """
        self._closed = True
        if self._reconnect_task:
            self._reconnect_task.cancel()
        if self._ws:
            asyncio.create_task(self._ws.close())
            self._ws = None

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/diorama/draw.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""Diorama Renderer - A tool for rendering selective views of macOS desktops

This script renders filtered views of the macOS desktop, preserving only selected applications
while maintaining system UI elements like menubar and dock. Each "diorama" shows a consistent
view of the system while isolating specific applications.

The image is "smart resized" to remove any empty space around the menubar and dock.

Key features:
- Captures shared window state, z-order and position information
- Filters windows by application based on whitelist
- Preserves system context (menubar, dock) in each view
- Preserves menu-owning / keyboard-focused window in each view
- Supports parallel views of the same desktop for multi-agent systems
"""

import argparse
import asyncio
import functools
import io
import json
import logging
import os
import sys
import time
from typing import Any, Dict, List, Optional, Tuple

from PIL import Image, ImageDraw

# simple, nicely formatted logging
logger = logging.getLogger(__name__)

from computer_server.diorama.safezone import (
    get_dock_bounds,
    get_menubar_bounds,
)


# Timing decorator for profiling
def timing_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        elapsed_time = end_time - start_time
        logger.debug(f"Function {func.__name__} took {elapsed_time:.4f} seconds to run")
        return result

    return wrapper


# Import Objective-C bridge libraries
try:
    import AppKit
    import Foundation
    import objc
    import Quartz
    from AppKit import NSApp, NSApplication, NSRunningApplication, NSWorkspace
    from ApplicationServices import AXUIElementCopyAttributeValue  # type: ignore
    from ApplicationServices import AXUIElementCopyAttributeValues  # type: ignore
    from ApplicationServices import AXUIElementCreateApplication  # type: ignore
    from ApplicationServices import AXUIElementCreateSystemWide  # type: ignore
    from ApplicationServices import AXUIElementGetTypeID  # type: ignore
    from ApplicationServices import AXValueGetType  # type: ignore
    from ApplicationServices import AXValueGetValue  # type: ignore
    from ApplicationServices import kAXChildrenAttribute  # type: ignore
    from ApplicationServices import kAXDescriptionAttribute  # type: ignore
    from ApplicationServices import kAXEnabledAttribute  # type: ignore
    from ApplicationServices import kAXErrorSuccess  # type: ignore
    from ApplicationServices import kAXFocusedApplicationAttribute  # type: ignore
    from ApplicationServices import kAXFocusedUIElementAttribute  # type: ignore
    from ApplicationServices import kAXFocusedWindowAttribute  # type: ignore
    from ApplicationServices import kAXMainWindowAttribute  # type: ignore
    from ApplicationServices import kAXPositionAttribute  # type: ignore
    from ApplicationServices import kAXRoleAttribute  # type: ignore
    from ApplicationServices import kAXRoleDescriptionAttribute  # type: ignore
    from ApplicationServices import kAXSelectedTextAttribute  # type: ignore
    from ApplicationServices import kAXSelectedTextRangeAttribute  # type: ignore
    from ApplicationServices import kAXSizeAttribute  # type: ignore
    from ApplicationServices import kAXTitleAttribute  # type: ignore
    from ApplicationServices import kAXValueAttribute  # type: ignore
    from ApplicationServices import kAXValueCFRangeType  # type: ignore
    from ApplicationServices import kAXValueCGPointType  # type: ignore
    from ApplicationServices import kAXValueCGSizeType  # type: ignore
    from ApplicationServices import kAXVisibleChildrenAttribute  # type: ignore
    from ApplicationServices import kAXWindowsAttribute  # type: ignore
    from Foundation import NSMakeRect, NSObject
except ImportError:
    logger.error("Error: This script requires PyObjC to be installed.")
    logger.error("Please install it with: pip install pyobjc")
    sys.exit(1)

# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXTitleAttribute = "AXTitle"
kAXValueAttribute = "AXValue"
kAXWindowsAttribute = "AXWindows"
kAXFocusedAttribute = "AXFocused"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
kAXMenuBarItemAttribute = "AXMenuBarItem"

# Constants for window properties
kCGWindowLayer = "kCGWindowLayer"  # Z-order information (lower values are higher in the stack)
kCGWindowAlpha = "kCGWindowAlpha"  # Window opacity

# Constants for application activation options
NSApplicationActivationOptions = {
    "regular": 0,  # Default activation
    "bringing_all_windows_forward": 1 << 0,  # NSApplicationActivateAllWindows
    "ignoring_other_apps": 1 << 1,  # NSApplicationActivateIgnoringOtherApps
}


def CFAttributeToPyObject(attrValue):
    def list_helper(list_value):
        list_builder = []
        for item in list_value:
            list_builder.append(CFAttributeToPyObject(item))
        return list_builder

    def number_helper(number_value):
        success, int_value = Foundation.CFNumberGetValue(  # type: ignore
            number_value, Foundation.kCFNumberIntType, None  # type: ignore
        )
        if success:
            return int(int_value)

        success, float_value = Foundation.CFNumberGetValue(  # type: ignore
            number_value, Foundation.kCFNumberDoubleType, None  # type: ignore
        )
        if success:
            return float(float_value)
        return None

    def axuielement_helper(element_value):
        return element_value

    cf_attr_type = Foundation.CFGetTypeID(attrValue)  # type: ignore
    cf_type_mapping = {
        Foundation.CFStringGetTypeID(): str,  # type: ignore
        Foundation.CFBooleanGetTypeID(): bool,  # type: ignore
        Foundation.CFArrayGetTypeID(): list_helper,  # type: ignore
        Foundation.CFNumberGetTypeID(): number_helper,  # type: ignore
        AXUIElementGetTypeID(): axuielement_helper,  # type: ignore
    }
    try:
        return cf_type_mapping[cf_attr_type](attrValue)
    except KeyError:
        # did not get a supported CF type. Move on to AX type
        pass

    ax_attr_type = AXValueGetType(attrValue)
    ax_type_map = {
        kAXValueCGSizeType: Foundation.NSSizeFromString,  # type: ignore
        kAXValueCGPointType: Foundation.NSPointFromString,  # type: ignore
        kAXValueCFRangeType: Foundation.NSRangeFromString,  # type: ignore
    }
    try:
        search_result = re.search("{.*}", attrValue.description())
        if search_result:
            extracted_str = search_result.group()
            return tuple(ax_type_map[ax_attr_type](extracted_str))
        return None
    except KeyError:
        return None


def element_attribute(element, attribute):
    if attribute == kAXChildrenAttribute:
        err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
        if err == kAXErrorSuccess:
            if isinstance(value, Foundation.NSArray):  # type: ignore
                return CFAttributeToPyObject(value)
            else:
                return value
    err, value = AXUIElementCopyAttributeValue(element, attribute, None)
    if err == kAXErrorSuccess:
        if isinstance(value, Foundation.NSArray):  # type: ignore
            return CFAttributeToPyObject(value)
        else:
            return value
    return None


def element_value(element, type):
    err, value = AXValueGetValue(element, type, None)
    if err == True:
        return value
    return None


@timing_decorator
def get_running_apps() -> List[NSRunningApplication]:
    """Get list of all running applications

    Returns:
        List of NSRunningApplication objects
    """
    return NSWorkspace.sharedWorkspace().runningApplications()


# @timing_decorator
def get_app_info(app: NSRunningApplication) -> Dict[str, Any]:
    """Get information about an application

    Args:
        app: NSRunningApplication object

    Returns:
        Dictionary with application information
    """
    return {
        "name": app.localizedName(),
        "bundle_id": app.bundleIdentifier(),
        "pid": app.processIdentifier(),
        "active": app.isActive(),
        "hidden": app.isHidden(),
        "terminated": app.isTerminated(),
    }


@timing_decorator
def get_all_windows() -> List[Dict[str, Any]]:
    """Get all windows from all applications with z-order information

    Returns:
        List of window dictionaries with z-order information
    """
    # Get all windows from Quartz
    # The kCGWindowListOptionOnScreenOnly flag gets only visible windows with preserved z-order
    window_list = Quartz.CGWindowListCopyWindowInfo(
        Quartz.kCGWindowListOptionOnScreenOnly, Quartz.kCGNullWindowID
    )

    # Create a dictionary of window z-order
    z_order = {
        window["kCGWindowNumber"]: z_index for z_index, window in enumerate(window_list[::-1])
    }

    # The kCGWindowListOptionAll flag gets all windows *without* z-order preserved
    window_list_all = Quartz.CGWindowListCopyWindowInfo(
        Quartz.kCGWindowListOptionAll, Quartz.kCGNullWindowID
    )

    # Process all windows
    windows = []
    for window in window_list_all:
        # We track z_index which is the index in the window list (0 is the desktop / background)

        # Get window properties
        window_id = window.get("kCGWindowNumber", 0)
        window_name = window.get("kCGWindowName", "")
        window_pid = window.get("kCGWindowOwnerPID", 0)
        window_bounds = window.get("kCGWindowBounds", {})
        window_owner = window.get("kCGWindowOwnerName", "")
        window_is_on_screen = window.get("kCGWindowIsOnscreen", False)

        # Get z-order information
        # Note: kCGWindowLayer provides the system's layer value (lower values are higher in the stack)
        layer = window.get(kCGWindowLayer, 0)
        opacity = window.get(kCGWindowAlpha, 1.0)
        z_index = z_order.get(window_id, -1)

        # Determine window role (desktop, dock, menubar, app)
        if window_name == "Dock" and window_owner == "Dock":
            role = "dock"
        elif window_name == "Menubar" and window_owner == "Window Server":
            role = "menubar"
        elif window_owner in ["Window Server", "Dock"]:
            role = "desktop"
        else:
            role = "app"

        # Only include windows with valid bounds
        if window_bounds:
            windows.append(
                {
                    "id": window_id,
                    "name": window_name or "Unnamed Window",
                    "pid": window_pid,
                    "owner": window_owner,
                    "role": role,
                    "is_on_screen": window_is_on_screen,
                    "bounds": {
                        "x": window_bounds.get("X", 0),
                        "y": window_bounds.get("Y", 0),
                        "width": window_bounds.get("Width", 0),
                        "height": window_bounds.get("Height", 0),
                    },
                    "layer": layer,  # System layer (lower values are higher in stack)
                    "z_index": z_index,  # Our z-index (0 is the desktop)
                    "opacity": opacity,
                }
            )

    windows = sorted(windows, key=lambda x: x["z_index"])

    return windows


def get_app_windows(app_pid: int, all_windows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Get all windows for a specific application

    Args:
        app_pid: Process ID of the application
        all_windows: List of all windows with z-order information

    Returns:
        List of window dictionaries for the app
    """
    # Filter windows by PID
    return [window for window in all_windows if window["pid"] == app_pid]


@timing_decorator
def draw_desktop_screenshot(
    app_whitelist: List[str] = None,
    all_windows: List[Dict[str, Any]] = None,
    dock_bounds: Dict[str, float] = None,
    dock_items: List[Dict[str, Any]] = None,
    menubar_bounds: Dict[str, float] = None,
    menubar_items: List[Dict[str, Any]] = None,
) -> Tuple[Optional[Image.Image], List[Dict[str, Any]]]:
    """Capture a screenshot of the entire desktop using Quartz compositing, including dock as a second pass.
    Args:
        app_whitelist: Optional list of app names to include in the screenshot
    Returns:
        PIL Image of the desktop or None if capture failed
    """
    import ctypes

    if dock_bounds is None:
        dock_bounds = get_dock_bounds()
    if dock_items is None:
        dock_items = get_dock_items()
    if menubar_bounds is None:
        menubar_bounds = get_menubar_bounds()
    if menubar_items is None:
        menubar_items = get_menubar_items()
    if all_windows is None:
        all_windows = get_all_windows()
    all_windows = all_windows[::-1]
    all_windows = [window for window in all_windows if window["is_on_screen"]]

    main_screen = AppKit.NSScreen.mainScreen()
    if main_screen:
        frame = main_screen.frame()
        screen_rect = Quartz.CGRectMake(0, 0, frame.size.width, frame.size.height)
    else:
        screen_rect = Quartz.CGRectNull

    # Screenshot-to-screen hitboxes
    hitboxes = []

    if app_whitelist is None:
        # Single pass: desktop, menubar, app, dock
        window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None)
        for window in all_windows:
            Foundation.CFArrayAppendValue(window_list, window["id"])
        cg_image = Quartz.CGWindowListCreateImageFromArray(
            screen_rect, window_list, Quartz.kCGWindowImageDefault
        )
        if cg_image is None:
            return None

        # Create CGContext for compositing
        width = int(frame.size.width)
        height = int(frame.size.height)
        color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB)
        cg_context = Quartz.CGBitmapContextCreate(
            None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast
        )
        Quartz.CGContextDrawImage(cg_context, screen_rect, cg_image)
        hitboxes.append({"hitbox": [0, 0, width, height], "target": [0, 0, width, height]})
    else:
        # Filter out windows that are not in the whitelist
        all_windows = [
            window
            for window in all_windows
            if window["owner"] in app_whitelist or window["role"] != "app"
        ]
        app_windows = [window for window in all_windows if window["role"] == "app"]

        dock_orientation = "side" if dock_bounds["width"] < dock_bounds["height"] else "bottom"

        menubar_length = (
            max(item["bounds"]["x"] + item["bounds"]["width"] for item in menubar_items)
            if menubar_items
            else 0
        )

        # Calculate bounds of app windows
        app_bounds = {
            "x": min(window["bounds"]["x"] for window in app_windows) if app_windows else 0,
            "y": min(window["bounds"]["y"] for window in app_windows) if app_windows else 0,
        }
        app_bounds["width"] = (
            max(window["bounds"]["x"] + window["bounds"]["width"] for window in app_windows)
            - app_bounds["x"]
            if app_windows
            else 0
        )
        app_bounds["height"] = (
            max(window["bounds"]["y"] + window["bounds"]["height"] for window in app_windows)
            - app_bounds["y"]
            if app_windows
            else 0
        )

        # Set minimum bounds of 256x256
        app_bounds["width"] = max(app_bounds["width"], 256)
        app_bounds["height"] = max(app_bounds["height"], 256)

        # Add dock bounds to app bounds
        if dock_orientation == "bottom":
            app_bounds["height"] += dock_bounds["height"] + 4
        elif dock_orientation == "side":
            if dock_bounds["x"] > frame.size.width / 2:
                app_bounds["width"] += dock_bounds["width"] + 4
            else:
                app_bounds["x"] -= dock_bounds["width"] + 4
                app_bounds["width"] += dock_bounds["width"] + 4

        # Add menubar bounds to app bounds
        app_bounds["height"] += menubar_bounds["height"]

        # Make sure app bounds contains menubar bounds
        app_bounds["width"] = max(app_bounds["width"], menubar_length)

        # Clamp bounds to screen
        app_bounds["x"] = max(app_bounds["x"], 0)
        app_bounds["y"] = max(app_bounds["y"], 0)
        app_bounds["width"] = min(app_bounds["width"], frame.size.width - app_bounds["x"])
        app_bounds["height"] = min(
            app_bounds["height"], frame.size.height - app_bounds["y"] + menubar_bounds["height"]
        )

        # Create CGContext for compositing
        width = int(app_bounds["width"])
        height = int(app_bounds["height"])
        color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB)
        cg_context = Quartz.CGBitmapContextCreate(
            None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast
        )

        def _draw_layer(cg_context, all_windows, source_rect, target_rect):
            """Draw a layer of windows from source_rect to target_rect on the given context."""
            window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None)
            for window in all_windows:
                Foundation.CFArrayAppendValue(window_list, window["id"])
            cg_image = Quartz.CGWindowListCreateImageFromArray(
                source_rect, window_list, Quartz.kCGWindowImageDefault
            )
            if cg_image is not None:
                Quartz.CGContextDrawImage(cg_context, target_rect, cg_image)

        # --- FIRST PASS: desktop, apps ---
        source_position = [app_bounds["x"], app_bounds["y"]]
        source_size = [app_bounds["width"], app_bounds["height"]]
        target_position = [0, min(menubar_bounds["y"] + menubar_bounds["height"], app_bounds["y"])]
        target_size = [app_bounds["width"], app_bounds["height"]]

        if dock_orientation == "bottom":
            source_size[1] += dock_bounds["height"]
            target_size[1] += dock_bounds["height"]
        elif dock_orientation == "side":
            if dock_bounds["x"] < frame.size.width / 2:
                source_position[0] -= dock_bounds["width"]
                target_position[0] -= dock_bounds["width"]
            source_size[0] += dock_bounds["width"]
            target_size[0] += dock_bounds["width"]

        app_source_rect = Quartz.CGRectMake(
            source_position[0], source_position[1], source_size[0], source_size[1]
        )
        app_target_rect = Quartz.CGRectMake(
            target_position[0],
            app_bounds["height"] - target_position[1] - target_size[1],
            target_size[0],
            target_size[1],
        )
        first_pass_windows = [
            w for w in all_windows if w["role"] == "app" or w["role"] == "desktop"
        ]
        _draw_layer(cg_context, first_pass_windows, app_source_rect, app_target_rect)

        hitboxes.append(
            {
                "hitbox": [
                    0,
                    menubar_bounds["height"],
                    app_bounds["width"],
                    menubar_bounds["height"] + app_bounds["height"],
                ],
                "target": [
                    app_source_rect.origin.x,
                    app_source_rect.origin.y,
                    app_source_rect.origin.x + app_bounds["width"],
                    app_source_rect.origin.y + app_bounds["height"],
                ],
            }
        )

        # --- SECOND PASS: menubar ---
        allowed_roles = {"menubar"}
        menubar_windows = [w for w in all_windows if w["role"] in allowed_roles]
        menubar_source_rect = Quartz.CGRectMake(0, 0, app_bounds["width"], menubar_bounds["height"])
        menubar_target_rect = Quartz.CGRectMake(
            0,
            app_bounds["height"] - menubar_bounds["height"],
            app_bounds["width"],
            menubar_bounds["height"],
        )
        _draw_layer(cg_context, menubar_windows, menubar_source_rect, menubar_target_rect)

        hitboxes.append(
            {
                "hitbox": [0, 0, app_bounds["width"], menubar_bounds["height"]],
                "target": [0, 0, app_bounds["width"], menubar_bounds["height"]],
            }
        )

        # --- THIRD PASS: dock, filtered ---
        # Step 1: Collect dock items to draw, with their computed target rects
        dock_draw_items = []
        for index, item in enumerate(dock_items):
            source_position = (item["bounds"]["x"], item["bounds"]["y"])
            source_size = (item["bounds"]["width"], item["bounds"]["height"])

            # apply whitelist to middle items
            if not (index == 0 or index == len(dock_items) - 1):
                if item["subrole"] == "AXApplicationDockItem":
                    if item["title"] not in app_whitelist:
                        continue
                elif item["subrole"] == "AXMinimizedWindowDockItem":
                    if not any(
                        window["name"] == item["title"]
                        and window["role"] == "app"
                        and window["owner"] in app_whitelist
                        for window in all_windows
                    ):
                        continue
                elif item["subrole"] == "AXFolderDockItem":
                    continue

            # Preserve unscaled (original) source position and size before any modification
            hitbox_position = source_position
            hitbox_size = source_size

            screen_position = source_position
            screen_size = source_size

            # stretch to screen size
            padding = 32
            if dock_orientation == "bottom":
                source_position = (source_position[0], 0)
                source_size = (source_size[0], frame.size.height)

                hitbox_position = (source_position[0], app_bounds["height"] - hitbox_size[1])
                hitbox_size = (source_size[0], hitbox_size[1])

                if index == 0:
                    source_size = (padding + source_size[0], source_size[1])
                    source_position = (source_position[0] - padding, 0)
                elif index == len(dock_items) - 1:
                    source_size = (source_size[0] + padding, source_size[1])
                    source_position = (source_position[0], 0)

            elif dock_orientation == "side":
                source_position = (0, source_position[1])
                source_size = (frame.size.width, source_size[1])

                hitbox_position = (
                    (
                        source_position[0]
                        if dock_bounds["x"] < frame.size.width / 2
                        else app_bounds["width"] - hitbox_size[0]
                    ),
                    source_position[1],
                )
                hitbox_size = (hitbox_size[0], source_size[1])

                if index == 0:
                    source_size = (source_size[0], padding + source_size[1])
                    source_position = (0, source_position[1] - padding)
                elif index == len(dock_items) - 1:
                    source_size = (source_size[0], source_size[1] + padding)
                    source_position = (0, source_position[1])

            # Compute the initial target position
            target_position = source_position
            target_size = source_size

            dock_draw_items.append(
                {
                    "item": item,
                    "index": index,
                    "source_position": source_position,
                    "source_size": source_size,
                    "target_size": target_size,
                    "target_position": target_position,  # Will be updated after packing
                    "hitbox_position": hitbox_position,
                    "hitbox_size": hitbox_size,
                    "screen_position": screen_position,
                    "screen_size": screen_size,
                }
            )

        # Step 2: Pack the target rects along the main axis, removing gaps
        packed_positions = []
        if dock_orientation == "bottom":
            # Pack left-to-right
            x_cursor = 0
            for draw_item in dock_draw_items:
                packed_positions.append((x_cursor, draw_item["target_position"][1]))
                x_cursor += draw_item["target_size"][0]
            packed_strip_length = x_cursor
            # Center horizontally
            x_offset = (app_bounds["width"] - packed_strip_length) / 2
            y_offset = frame.size.height - app_bounds["height"]
            for i, draw_item in enumerate(dock_draw_items):
                px, py = packed_positions[i]
                draw_item["target_position"] = (px + x_offset, py - y_offset)

            # Pack unscaled source rects
            x_cursor = 0
            for draw_item in dock_draw_items:
                draw_item["hitbox_position"] = (x_cursor, draw_item["hitbox_position"][1])
                x_cursor += draw_item["hitbox_size"][0]
            packed_strip_length = x_cursor
            # Center horizontally
            x_offset = (app_bounds["width"] - packed_strip_length) / 2
            for i, draw_item in enumerate(dock_draw_items):
                px, py = draw_item["hitbox_position"]
                draw_item["hitbox_position"] = (px + x_offset, py)
        elif dock_orientation == "side":
            # Pack top-to-bottom
            y_cursor = 0
            for draw_item in dock_draw_items:
                packed_positions.append((draw_item["target_position"][0], y_cursor))
                y_cursor += draw_item["target_size"][1]
            packed_strip_length = y_cursor
            # Center vertically
            y_offset = (app_bounds["height"] - packed_strip_length) / 2
            x_offset = (
                0
                if dock_bounds["x"] < frame.size.width / 2
                else frame.size.width - app_bounds["width"]
            )
            for i, draw_item in enumerate(dock_draw_items):
                px, py = packed_positions[i]
                draw_item["target_position"] = (px - x_offset, py + y_offset)

            # Pack unscaled source rects
            y_cursor = 0
            for draw_item in dock_draw_items:
                draw_item["hitbox_position"] = (draw_item["hitbox_position"][0], y_cursor)
                y_cursor += draw_item["hitbox_size"][1]
            packed_strip_length = y_cursor
            # Center vertically
            y_offset = (app_bounds["height"] - packed_strip_length) / 2
            for i, draw_item in enumerate(dock_draw_items):
                px, py = draw_item["hitbox_position"]
                draw_item["hitbox_position"] = (px, py + y_offset)

        dock_windows = [window for window in all_windows if window["role"] == "dock"]
        # Step 3: Draw dock items using packed and recentered positions
        for draw_item in dock_draw_items:
            item = draw_item["item"]
            source_position = draw_item["source_position"]
            source_size = draw_item["source_size"]
            target_position = draw_item["target_position"]
            target_size = draw_item["target_size"]

            # flip target position y
            target_position = (
                target_position[0],
                app_bounds["height"] - target_position[1] - target_size[1],
            )

            source_rect = Quartz.CGRectMake(*source_position, *source_size)
            target_rect = Quartz.CGRectMake(*target_position, *target_size)

            _draw_layer(cg_context, dock_windows, source_rect, target_rect)

            hitbox_position = draw_item["hitbox_position"]
            hitbox_size = draw_item["hitbox_size"]

            # Debug: Draw true hitbox rect (packed position, unscaled size)
            # # Flip y like target_rect
            # hitbox_position_flipped = (
            #     hitbox_position[0],
            #     app_bounds['height'] - hitbox_position[1] - hitbox_size[1]
            # )
            # hitbox_rect = Quartz.CGRectMake(*hitbox_position_flipped, *hitbox_size)
            # Quartz.CGContextSetStrokeColorWithColor(cg_context, Quartz.CGColorCreateGenericRGB(0, 1, 0, 1))
            # Quartz.CGContextStrokeRect(cg_context, hitbox_rect)

            hitboxes.append(
                {
                    "hitbox": [
                        *hitbox_position,
                        hitbox_position[0] + hitbox_size[0],
                        hitbox_position[1] + hitbox_size[1],
                    ],
                    "target": [
                        *draw_item["screen_position"],
                        draw_item["screen_position"][0] + draw_item["screen_size"][0],
                        draw_item["screen_position"][1] + draw_item["screen_size"][1],
                    ],
                }
            )

    # Convert composited context to CGImage
    final_cg_image = Quartz.CGBitmapContextCreateImage(cg_context)
    ns_image = AppKit.NSImage.alloc().initWithCGImage_size_(final_cg_image, Foundation.NSZeroSize)
    ns_data = ns_image.TIFFRepresentation()
    bitmap_rep = AppKit.NSBitmapImageRep.imageRepWithData_(ns_data)
    png_data = bitmap_rep.representationUsingType_properties_(AppKit.NSBitmapImageFileTypePNG, None)
    image_data = io.BytesIO(png_data)
    return Image.open(image_data), hitboxes


@timing_decorator
def get_menubar_items(active_app_pid: int = None) -> List[Dict[str, Any]]:
    """Get menubar items from the active application using Accessibility API

    Args:
        active_app_pid: PID of the active application

    Returns:
        List of dictionaries with menubar item information
    """
    menubar_items = []

    if active_app_pid is None:
        # Get the frontmost application's PID if none provided
        frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
        if frontmost_app:
            active_app_pid = frontmost_app.processIdentifier()
        else:
            logger.error("Error: Could not determine frontmost application")
            return menubar_items

    # Create an accessibility element for the application
    app_element = AXUIElementCreateApplication(active_app_pid)
    if app_element is None:
        logger.error(f"Error: Could not create accessibility element for PID {active_app_pid}")
        return menubar_items

    # Get the menubar
    menubar = element_attribute(app_element, kAXMenuBarAttribute)
    if menubar is None:
        logger.error(f"Error: Could not get menubar for application with PID {active_app_pid}")
        return menubar_items

    # Get the menubar items
    children = element_attribute(menubar, kAXChildrenAttribute)
    if children is None:
        logger.error("Error: Could not get menubar items")
        return menubar_items

    # Process each menubar item
    for i in range(len(children)):
        item = children[i]

        # Get item title
        title = element_attribute(item, kAXTitleAttribute) or "Untitled"

        # Create bounding box
        bounds = {"x": 0, "y": 0, "width": 0, "height": 0}

        # Get item position
        position_value = element_attribute(item, kAXPositionAttribute)
        if position_value:
            position_value = element_value(position_value, kAXValueCGPointType)
            bounds["x"] = position_value.x
            bounds["y"] = position_value.y

        # Get item size
        size_value = element_attribute(item, kAXSizeAttribute)
        if size_value:
            size_value = element_value(size_value, kAXValueCGSizeType)
            bounds["width"] = size_value.width
            bounds["height"] = size_value.height

        # Add to list
        menubar_items.append(
            {"title": title, "bounds": bounds, "index": i, "app_pid": active_app_pid}
        )

    return menubar_items


@timing_decorator
def get_dock_items() -> List[Dict[str, Any]]:
    """Get all items in the macOS Dock

    Returns:
        List of dictionaries with Dock item information
    """
    dock_items = []

    # Find the Dock process
    dock_pid = None
    running_apps = get_running_apps()
    for app in running_apps:
        if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
            dock_pid = app.processIdentifier()
            break

    if dock_pid is None:
        logger.error("Error: Could not find Dock process")
        return dock_items

    # Create an accessibility element for the Dock
    dock_element = AXUIElementCreateApplication(dock_pid)
    if dock_element is None:
        logger.error(f"Error: Could not create accessibility element for Dock (PID {dock_pid})")
        return dock_items

    # Get the Dock's main element
    dock_list = element_attribute(dock_element, kAXChildrenAttribute)
    if dock_list is None or len(dock_list) == 0:
        logger.error("Error: Could not get Dock children")
        return dock_items

    # Find the Dock's application list (usually the first child)
    dock_app_list = None
    for child in dock_list:
        role = element_attribute(child, kAXRoleAttribute)
        if role == "AXList":
            dock_app_list = child
            break

    if dock_app_list is None:
        logger.error("Error: Could not find Dock application list")
        return dock_items

    # Get all items in the Dock
    items = element_attribute(dock_app_list, kAXChildrenAttribute)
    if items is None:
        logger.error("Error: Could not get Dock items")
        return dock_items

    # Process each Dock item
    for i, item in enumerate(items):
        # Get item attributes
        title = element_attribute(item, kAXTitleAttribute) or "Untitled"
        description = element_attribute(item, "AXDescription") or ""
        role = element_attribute(item, kAXRoleAttribute) or ""
        subrole = element_attribute(item, "AXSubrole") or ""

        # Create bounding box
        bounds = {"x": 0, "y": 0, "width": 0, "height": 0}

        # Get item position
        position_value = element_attribute(item, kAXPositionAttribute)
        if position_value:
            position_value = element_value(position_value, kAXValueCGPointType)
            bounds["x"] = position_value.x
            bounds["y"] = position_value.y

        # Get item size
        size_value = element_attribute(item, kAXSizeAttribute)
        if size_value:
            size_value = element_value(size_value, kAXValueCGSizeType)
            bounds["width"] = size_value.width
            bounds["height"] = size_value.height

        # Determine if this is an application, file/folder, or separator
        item_type = "unknown"
        if subrole == "AXApplicationDockItem":
            item_type = "application"
        elif subrole == "AXFolderDockItem":
            item_type = "folder"
        elif subrole == "AXDocumentDockItem":
            item_type = "document"
        elif subrole == "AXSeparatorDockItem" or role == "AXSeparator":
            item_type = "separator"
        elif "trash" in title.lower():
            item_type = "trash"

        # Add to list
        dock_items.append(
            {
                "title": title,
                "description": description,
                "bounds": bounds,
                "index": i,
                "type": item_type,
                "role": role,
                "subrole": subrole,
            }
        )

    return dock_items


class AppActivationContext:
    def __init__(self, active_app_pid=None, active_app_to_use="", logger=None):
        self.active_app_pid = active_app_pid
        self.active_app_to_use = active_app_to_use
        self.logger = logger
        self.frontmost_app = None

    def __enter__(self):
        from AppKit import NSWorkspace

        if self.active_app_pid:
            if self.logger and self.active_app_to_use:
                self.logger.debug(
                    f"Automatically activating app '{self.active_app_to_use}' for screenshot composition"
                )
            self.frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
            running_apps_list = NSWorkspace.sharedWorkspace().runningApplications()
            for app in running_apps_list:
                if app.processIdentifier() == self.active_app_pid:
                    app.activateWithOptions_(0)
                    # sleep for 0.5 seconds
                    time.sleep(0.5)
                    break
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.frontmost_app:
            # sleep for 0.5 seconds
            time.sleep(0.5)
            self.frontmost_app.activateWithOptions_(0)


def get_frontmost_and_active_app(all_windows, running_apps, app_whitelist):
    from AppKit import NSWorkspace

    frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()

    active_app_to_use = None
    active_app_pid = None

    # Find the topmost (highest z_index) non-filtered app
    for window in reversed(all_windows):
        owner = window.get("owner")
        role = window.get("role")
        is_on_screen = window.get("is_on_screen")

        # Skip non-app windows
        if role != "app":
            continue

        # Skip not-on-screen windows
        if not is_on_screen:
            continue

        # Skip filtered apps
        if app_whitelist is not None and owner not in app_whitelist:
            continue

        # Found a suitable app
        active_app_to_use = owner
        active_app_pid = window.get("pid")
        break

    # If no suitable app found, use Finder
    if active_app_to_use is None:
        active_app_to_use = "Finder"
        for app in running_apps:
            if app.localizedName() == "Finder":
                active_app_pid = app.processIdentifier()
                break

    return frontmost_app, active_app_to_use, active_app_pid


def capture_all_apps(
    save_to_disk: bool = False,
    app_whitelist: List[str] = None,
    output_dir: str = None,
    take_focus: bool = True,
) -> Tuple[Dict[str, Any], Optional[Image.Image]]:
    """Capture screenshots of all running applications

    Args:
        save_to_disk: Whether to save screenshots to disk
        app_whitelist: Optional list of app names to include in the recomposited screenshot
                    (will always include 'Window Server' and 'Dock')

    Returns:
        Dictionary with application information and screenshots
        Optional PIL Image of the recomposited screenshot
    """
    result = {
        "timestamp": time.time(),
        "applications": [],
        "windows": [],  # New array to store all windows, including those without apps
        "menubar_items": [],  # New array to store menubar items
        "dock_items": [],  # New array to store dock items
    }

    # Get all windows with z-order information
    all_windows = get_all_windows()

    # Get all running applications
    running_apps = get_running_apps()

    frontmost_app, active_app_to_use, active_app_pid = (
        get_frontmost_and_active_app(all_windows, running_apps, app_whitelist)
        if take_focus
        else (None, None, None)
    )

    # Use AppActivationContext to activate the app and restore focus
    with AppActivationContext(active_app_pid, active_app_to_use, logger):

        # Process applications
        for app in running_apps:
            # Skip system apps without a bundle ID
            if app.bundleIdentifier() is None:
                continue

            app_info = get_app_info(app)
            app_windows = get_app_windows(app.processIdentifier(), all_windows)

            app_data = {"info": app_info, "windows": [window["id"] for window in app_windows]}

            result["applications"].append(app_data)

        # Add all windows to the result
        result["windows"] = all_windows

        # Get menubar items from the active application
        menubar_items = get_menubar_items(active_app_pid)
        result["menubar_items"] = menubar_items

        # Get dock items
        dock_items = get_dock_items()
        result["dock_items"] = dock_items

        # Get menubar bounds
        menubar_bounds = get_menubar_bounds()
        result["menubar_bounds"] = menubar_bounds

        # Get dock bounds
        dock_bounds = get_dock_bounds()
        result["dock_bounds"] = dock_bounds

        # Capture the entire desktop using Quartz compositing
        desktop_screenshot, hitboxes = draw_desktop_screenshot(
            app_whitelist, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items
        )

        result["hitboxes"] = hitboxes

        from PIL import Image, ImageChops, ImageDraw

        def _draw_hitboxes(img, hitboxes, key="target"):
            """
            Overlay opaque colored rectangles for each hitbox (using hitbox[key])
            with color depending on index, then multiply overlay onto img.
            Args:
                img: PIL.Image (RGBA or RGB)
                hitboxes: list of dicts with 'hitbox' and 'target' keys
                key: 'hitbox' or 'target'
            Returns:
                PIL.Image with overlayed hitboxes (same mode/size as input)
            """
            # Ensure RGBA mode for blending
            base = img.convert("RGBA")
            overlay = Image.new("RGBA", base.size, (0, 0, 0, 0))
            draw = ImageDraw.Draw(overlay)

            # Distinct colors for order
            colors = [
                (255, 0, 0, 180),  # Red
                (0, 255, 0, 180),  # Green
                (0, 0, 255, 180),  # Blue
                (255, 255, 0, 180),  # Yellow
                (0, 255, 255, 180),  # Cyan
                (255, 0, 255, 180),  # Magenta
                (255, 128, 0, 180),  # Orange
                (128, 0, 255, 180),  # Purple
                (0, 128, 255, 180),  # Sky blue
                (128, 255, 0, 180),  # Lime
            ]
            # Set minimum brightness for colors
            min_brightness = 0
            colors = [
                (
                    max(min_brightness, c[0]),
                    max(min_brightness, c[1]),
                    max(min_brightness, c[2]),
                    c[3],
                )
                for c in colors
            ]

            for i, h in enumerate(hitboxes):
                rect = h.get(key)
                color = colors[i % len(colors)]
                if rect:
                    draw.rectangle(rect, fill=color)

            # Multiply blend overlay onto base
            result = ImageChops.multiply(base, overlay)
            return result

        # DEBUG: Save hitboxes to disk
        if desktop_screenshot and save_to_disk and output_dir:
            desktop_path = os.path.join(output_dir, "desktop.png")
            desktop_screenshot.save(desktop_path)
            result["desktop_screenshot"] = desktop_path

            logger.info(f"Saved desktop screenshot to {desktop_path}")

            if app_whitelist:
                # Take screenshot without whitelist
                desktop_screenshot_full, hitboxes_full = draw_desktop_screenshot(
                    None, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items
                )

                # Draw hitboxes on both images using overlay
                img1 = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox")
                img2 = (
                    _draw_hitboxes(desktop_screenshot_full.copy(), hitboxes, key="target")
                    if desktop_screenshot_full
                    else None
                )

                if img2 and hitboxes_full:

                    # Compose side-by-side
                    from PIL import Image

                    width = img1.width + img2.width
                    height = max(img1.height, img2.height)
                    combined = Image.new("RGBA", (width, height), (0, 0, 0, 0))
                    combined.paste(img1, (0, 0))
                    combined.paste(img2, (img1.width, 0))
                    side_by_side_path = os.path.join(output_dir, "side_by_side_hitboxes.png")
                    combined.save(side_by_side_path)
                    result["side_by_side_hitboxes"] = side_by_side_path
            else:
                # Overlay hitboxes using new function
                hitbox_img = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox")
                hitbox_path = os.path.join(output_dir, "hitboxes.png")
                hitbox_img.save(hitbox_path)
                result["hitbox_screenshot"] = hitbox_path

        # Focus restoration is now handled by AppActivationContext

    return result, desktop_screenshot


async def run_capture():
    """Run the screenshot capture asynchronously"""
    # Parse command line arguments
    parser = argparse.ArgumentParser(
        description="Capture screenshots of running macOS applications"
    )
    parser.add_argument(
        "--output", "-o", help="Output directory for screenshots", default="app_screenshots"
    )
    parser.add_argument(
        "--filter",
        "-f",
        nargs="+",
        help="Filter recomposited screenshot to only include specified apps",
    )
    parser.add_argument(
        "--menubar",
        "-m",
        action="store_true",
        help="List menubar and status items with their bounding boxes",
    )
    parser.add_argument(
        "--dock", "-d", action="store_true", help="List Dock items with their bounding boxes"
    )
    parser.add_argument(
        "--demo",
        nargs="*",
        help="Demo mode: pass app names to capture individual and combinations, create mosaic PNG",
    )
    args = parser.parse_args()

    # Create output directory in the current directory if not absolute
    if not os.path.isabs(args.output):
        output_dir = os.path.join(os.getcwd(), args.output)
    else:
        output_dir = args.output

    # DEMO MODE: capture each app and all non-empty combinations, then mosaic
    if args.demo:
        from PIL import Image

        demo_apps = args.demo
        print(f"Running in DEMO mode for apps: {demo_apps}")
        groups = []
        for item in demo_apps:
            if "/" in item:
                group = [x.strip() for x in item.split("/") if x.strip()]
            else:
                group = [item.strip()]
            if group:
                groups.append(group)
        screenshots = []
        for group in groups:
            print(f"Capturing for apps: {group}")
            _, img = capture_all_apps(app_whitelist=group)
            if img:
                screenshots.append((group, img))
        if not screenshots:
            print("No screenshots captured in demo mode.")
            return

        # Mosaic-pack: grid (rows of sqrt(N))
        def make_mosaic(images, pad=64, bg=(30, 30, 30)):
            import rpack

            sizes = [(img.width + pad, img.height + pad) for _, img in images]
            positions = rpack.pack(sizes)
            # Find the bounding box for the mosaic
            max_x = max(x + w for (x, y), (w, h) in zip(positions, sizes))
            max_y = max(y + h for (x, y), (w, h) in zip(positions, sizes))
            mosaic = Image.new("RGBA", (max_x, max_y), bg)
            for (group, img), (x, y) in zip(images, positions):
                mosaic.paste(img, (x, y))
            return mosaic

        mosaic_img = make_mosaic(screenshots)
        mosaic_path = os.path.join(output_dir, "demo_mosaic.png")
        os.makedirs(output_dir, exist_ok=True)
        mosaic_img.save(mosaic_path)
        print(f"Demo mosaic saved to: {mosaic_path}")
        return

    # Capture all apps and save to disk, including a recomposited screenshot
    print("Capturing screenshots of all running applications...")
    print(f"Saving screenshots to: {output_dir}")

    # If filter is provided, show what we're filtering by
    if args.filter:
        print(
            f"Filtering recomposited screenshot to only include: {', '.join(args.filter)} (plus Window Server and Dock)"
        )

    result, img = capture_all_apps(
        save_to_disk=True, app_whitelist=args.filter, output_dir=output_dir, take_focus=True
    )

    # Print summary
    print("\nCapture complete!")
    print(f"Captured {len(result['applications'])} applications")

    total_app_windows = sum(len(app["windows"]) for app in result["applications"])
    print(f"Total application windows captured: {total_app_windows}")
    print(f"Total standalone windows captured: {len(result['windows'])}")

    # Print details of each application
    print("\nApplication details:")
    for app in result["applications"]:
        app_info = app["info"]
        windows = app["windows"]
        print(f"  - {app_info['name']} ({len(windows)} windows)")

    # Print recomposited screenshot path if available
    if "desktop_screenshot" in result:
        print(f"\nRecomposited screenshot saved to: {result['desktop_screenshot']}")

    # Print menubar items if requested
    if args.menubar and "menubar_items" in result:
        print("\nMenubar items:")

        # Find app name for the PID
        app_name_by_pid = {}
        for app in result["applications"]:
            app_info = app["info"]
            app_name_by_pid[app_info["pid"]] = app_info["name"]

        for item in result["menubar_items"]:
            print(f"  - {item['title']}")
            print(
                f"    Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}"
            )

            if "app_pid" in item:
                app_name = app_name_by_pid.get(
                    item["app_pid"], f"Unknown App (PID: {item['app_pid']})"
                )
                print(f"    App: {app_name} (PID: {item['app_pid']})")

            if "window_id" in item:
                print(f"    Window ID: {item['window_id']}")
            if "owner" in item:
                print(f"    Owner: {item['owner']}")
            if "layer" in item and "z_index" in item:
                print(f"    Layer: {item['layer']}, Z-Index: {item['z_index']}")
            print("")

    # Print dock items if requested
    if args.dock and "dock_items" in result:
        print("\nDock items:")
        for item in result["dock_items"]:
            print(f"  - {item['title']} ({item['type']})")
            print(f"    Description: {item['description']}")
            print(
                f"    Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}"
            )
            print(f"    Role: {item['role']}, Subrole: {item['subrole']}")
            print(f"    Index: {item['index']}")
            print("")

    # Save the metadata to a JSON file
    metadata_path = os.path.join(output_dir, "metadata.json")
    with open(metadata_path, "w") as f:
        json.dump(result, f, indent=2)

    print(f"\nMetadata saved to: {metadata_path}")


if __name__ == "__main__":
    asyncio.run(run_capture())

```

--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/handlers/macos.py:
--------------------------------------------------------------------------------

```python
import pyautogui

pyautogui.FAILSAFE = False
import asyncio
import base64
import copy
import json
import logging
import re
import time
from ctypes import POINTER, byref, c_void_p
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple

import AppKit
import Foundation
import objc
from AppKit import NSWorkspace  # type: ignore
from ApplicationServices import AXUIElementCopyAttributeValue  # type: ignore
from ApplicationServices import AXUIElementCopyAttributeValues  # type: ignore
from ApplicationServices import AXUIElementCreateApplication  # type: ignore
from ApplicationServices import AXUIElementCreateSystemWide  # type: ignore
from ApplicationServices import AXUIElementGetTypeID  # type: ignore
from ApplicationServices import AXValueGetType  # type: ignore
from ApplicationServices import AXValueGetValue  # type: ignore
from ApplicationServices import kAXChildrenAttribute  # type: ignore
from ApplicationServices import kAXDescriptionAttribute  # type: ignore
from ApplicationServices import kAXEnabledAttribute  # type: ignore
from ApplicationServices import kAXErrorSuccess  # type: ignore
from ApplicationServices import kAXFocusedApplicationAttribute  # type: ignore
from ApplicationServices import kAXFocusedUIElementAttribute  # type: ignore
from ApplicationServices import kAXFocusedWindowAttribute  # type: ignore
from ApplicationServices import kAXMainWindowAttribute  # type: ignore
from ApplicationServices import kAXPositionAttribute  # type: ignore
from ApplicationServices import kAXRoleAttribute  # type: ignore
from ApplicationServices import kAXRoleDescriptionAttribute  # type: ignore
from ApplicationServices import kAXSelectedTextAttribute  # type: ignore
from ApplicationServices import kAXSelectedTextRangeAttribute  # type: ignore
from ApplicationServices import kAXSizeAttribute  # type: ignore
from ApplicationServices import kAXTitleAttribute  # type: ignore
from ApplicationServices import kAXValueAttribute  # type: ignore
from ApplicationServices import kAXValueCFRangeType  # type: ignore
from ApplicationServices import kAXValueCGPointType  # type: ignore
from ApplicationServices import kAXValueCGSizeType  # type: ignore
from ApplicationServices import kAXVisibleChildrenAttribute  # type: ignore
from ApplicationServices import kAXWindowsAttribute  # type: ignore
from pynput.keyboard import Controller as KeyboardController
from pynput.keyboard import Key
from pynput.mouse import Button
from pynput.mouse import Controller as MouseController
from Quartz.CoreGraphics import *  # type: ignore
from Quartz.CoreGraphics import CGPoint, CGSize  # type: ignore

from .base import BaseAccessibilityHandler, BaseAutomationHandler

logger = logging.getLogger(__name__)

# Trigger accessibility permissions prompt on macOS
try:
    # Source - https://stackoverflow.com/a/17134
    # Posted by Andreas
    # Retrieved 2025-12-03, License - CC BY-SA 4.0
    # Attempt to create and post a mouse event to trigger the permissions prompt
    # This will cause macOS to show "Python would like to control this computer using accessibility features"
    current_pos = CGEventGetLocation(CGEventCreate(None))
    p = CGPoint()
    p.x = current_pos.x
    p.y = current_pos.y

    me = CGEventCreateMouseEvent(None, kCGEventMouseMoved, p, 0)
    if me:
        CGEventPost(kCGHIDEventTap, me)
        CFRelease(me)
except Exception as e:
    logger.debug(f"Failed to trigger accessibility permissions prompt: {e}")

# Trigger screen recording prompt on macOS
try:
    import pyautogui

    pyautogui.screenshot()
except Exception as e:
    logger.debug(f"Failed to trigger screenshot permissions prompt: {e}")


# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXTitleAttribute = "AXTitle"
kAXValueAttribute = "AXValue"
kAXWindowsAttribute = "AXWindows"
kAXFocusedAttribute = "AXFocused"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
kAXMenuBarItemAttribute = "AXMenuBarItem"

# Constants for window properties
kCGWindowLayer = "kCGWindowLayer"  # Z-order information (lower values are higher in the stack)
kCGWindowAlpha = "kCGWindowAlpha"  # Window opacity

# Constants for application activation options
NSApplicationActivationOptions = {
    "regular": 0,  # Default activation
    "bringing_all_windows_forward": 1 << 0,  # NSApplicationActivateAllWindows
    "ignoring_other_apps": 1 << 1,  # NSApplicationActivateIgnoringOtherApps
}


def CFAttributeToPyObject(attrValue):
    """Convert Core Foundation attribute values to Python objects.

    Args:
        attrValue: Core Foundation attribute value to convert

    Returns:
        Converted Python object or None if conversion fails
    """

    def list_helper(list_value):
        """Helper function to convert CF arrays to Python lists.

        Args:
            list_value: Core Foundation array to convert

        Returns:
            Python list containing converted items
        """
        list_builder = []
        for item in list_value:
            list_builder.append(CFAttributeToPyObject(item))
        return list_builder

    def number_helper(number_value):
        """Helper function to convert CF numbers to Python numbers.

        Args:
            number_value: Core Foundation number to convert

        Returns:
            Python int or float, or None if conversion fails
        """
        success, int_value = Foundation.CFNumberGetValue(  # type: ignore
            number_value, Foundation.kCFNumberIntType, None  # type: ignore
        )
        if success:
            return int(int_value)

        success, float_value = Foundation.CFNumberGetValue(  # type: ignore
            number_value, Foundation.kCFNumberDoubleType, None  # type: ignore
        )
        if success:
            return float(float_value)
        return None

    def axuielement_helper(element_value):
        """Helper function to handle AX UI elements.

        Args:
            element_value: Accessibility UI element to process

        Returns:
            The element value unchanged
        """
        return element_value

    cf_attr_type = Foundation.CFGetTypeID(attrValue)  # type: ignore
    cf_type_mapping = {
        Foundation.CFStringGetTypeID(): str,  # type: ignore
        Foundation.CFBooleanGetTypeID(): bool,  # type: ignore
        Foundation.CFArrayGetTypeID(): list_helper,  # type: ignore
        Foundation.CFNumberGetTypeID(): number_helper,  # type: ignore
        AXUIElementGetTypeID(): axuielement_helper,  # type: ignore
    }
    try:
        return cf_type_mapping[cf_attr_type](attrValue)
    except KeyError:
        # did not get a supported CF type. Move on to AX type
        pass

    ax_attr_type = AXValueGetType(attrValue)
    ax_type_map = {
        kAXValueCGSizeType: Foundation.NSSizeFromString,  # type: ignore
        kAXValueCGPointType: Foundation.NSPointFromString,  # type: ignore
        kAXValueCFRangeType: Foundation.NSRangeFromString,  # type: ignore
    }
    try:
        search_result = re.search("{.*}", attrValue.description())
        if search_result:
            extracted_str = search_result.group()
            return tuple(ax_type_map[ax_attr_type](extracted_str))
        return None
    except KeyError:
        return None


def element_attribute(element, attribute):
    """Get an attribute value from an accessibility element.

    Args:
        element: The accessibility element
        attribute: The attribute name to retrieve

    Returns:
        The attribute value or None if not found
    """
    if attribute == kAXChildrenAttribute:
        err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
        if err == kAXErrorSuccess:
            if isinstance(value, Foundation.NSArray):  # type: ignore
                return CFAttributeToPyObject(value)
            else:
                return value
    err, value = AXUIElementCopyAttributeValue(element, attribute, None)
    if err == kAXErrorSuccess:
        if isinstance(value, Foundation.NSArray):  # type: ignore
            return CFAttributeToPyObject(value)
        else:
            return value
    return None


def element_value(element, type):
    """Extract a typed value from an accessibility element.

    Args:
        element: The accessibility element containing the value
        type: The expected value type

    Returns:
        The extracted value or None if extraction fails
    """
    err, value = AXValueGetValue(element, type, None)
    if err == True:
        return value
    return None


class UIElement:
    """Represents a UI element in the accessibility tree with position, size, and hierarchy information."""

    def __init__(self, element, offset_x=0, offset_y=0, max_depth=None, parents_visible_bbox=None):
        """Initialize a UIElement from an accessibility element.

        Args:
            element: The accessibility element to wrap
            offset_x: X offset for position calculations
            offset_y: Y offset for position calculations
            max_depth: Maximum depth to traverse for children
            parents_visible_bbox: Parent's visible bounding box for clipping
        """
        self.ax_element = element
        self.content_identifier = ""
        self.identifier = ""
        self.name = ""
        self.children = []
        self.description = ""
        self.role_description = ""
        self.value = None
        self.max_depth = max_depth

        # Set role
        self.role = element_attribute(element, kAXRoleAttribute)
        if self.role is None:
            self.role = "No role"

        # Set name
        self.name = element_attribute(element, kAXTitleAttribute)
        if self.name is not None:
            # Convert tuple to string if needed
            if isinstance(self.name, tuple):
                self.name = str(self.name[0]) if self.name else ""
            self.name = self.name.replace(" ", "_")

        # Set enabled
        self.enabled = element_attribute(element, kAXEnabledAttribute)
        if self.enabled is None:
            self.enabled = False

        # Set position and size
        position = element_attribute(element, kAXPositionAttribute)
        size = element_attribute(element, kAXSizeAttribute)
        start_position = element_value(position, kAXValueCGPointType)

        if self.role == "AXWindow" and start_position is not None:
            offset_x = start_position.x
            offset_y = start_position.y

        self.absolute_position = copy.copy(start_position)
        self.position = start_position
        if self.position is not None:
            self.position.x -= max(0, offset_x)
            self.position.y -= max(0, offset_y)
        self.size = element_value(size, kAXValueCGSizeType)

        self._set_bboxes(parents_visible_bbox)

        # Set component center
        if start_position is None or self.size is None:
            print("Position is None")
            return
        self.center = (
            start_position.x + offset_x + self.size.width / 2,
            start_position.y + offset_y + self.size.height / 2,
        )

        self.description = element_attribute(element, kAXDescriptionAttribute)
        self.role_description = element_attribute(element, kAXRoleDescriptionAttribute)
        attribute_value = element_attribute(element, kAXValueAttribute)

        # Set value
        self.value = attribute_value
        if attribute_value is not None:
            if isinstance(attribute_value, Foundation.NSArray):  # type: ignore
                self.value = []
                for value in attribute_value:
                    self.value.append(value)
            # Check if it's an accessibility element by checking its type ID
            elif Foundation.CFGetTypeID(attribute_value) == AXUIElementGetTypeID():  # type: ignore
                self.value = UIElement(attribute_value, offset_x, offset_y)

        # Set children
        if self.max_depth is None or self.max_depth > 0:
            self.children = self._get_children(element, start_position, offset_x, offset_y)
        else:
            self.children = []

        self.calculate_hashes()

    def _set_bboxes(self, parents_visible_bbox):
        """Set bounding box and visible bounding box for the element.

        Args:
            parents_visible_bbox: Parent's visible bounding box for intersection calculation
        """
        if not self.absolute_position or not self.size:
            self.bbox = None
            self.visible_bbox = None
            return
        self.bbox = [
            int(self.absolute_position.x),
            int(self.absolute_position.y),
            int(self.absolute_position.x + self.size.width),
            int(self.absolute_position.y + self.size.height),
        ]
        if parents_visible_bbox:
            # check if not intersected
            if (
                self.bbox[0] > parents_visible_bbox[2]
                or self.bbox[1] > parents_visible_bbox[3]
                or self.bbox[2] < parents_visible_bbox[0]
                or self.bbox[3] < parents_visible_bbox[1]
            ):
                self.visible_bbox = None
            else:
                self.visible_bbox = [
                    int(max(self.bbox[0], parents_visible_bbox[0])),
                    int(max(self.bbox[1], parents_visible_bbox[1])),
                    int(min(self.bbox[2], parents_visible_bbox[2])),
                    int(min(self.bbox[3], parents_visible_bbox[3])),
                ]
        else:
            self.visible_bbox = self.bbox

    def _get_children(self, element, start_position, offset_x, offset_y):
        """Get child elements from the accessibility element.

        Args:
            element: The parent accessibility element
            start_position: Starting position for offset calculations
            offset_x: X offset for child positioning
            offset_y: Y offset for child positioning

        Returns:
            List of UIElement children
        """
        children = element_attribute(element, kAXChildrenAttribute)
        visible_children = element_attribute(element, kAXVisibleChildrenAttribute)
        found_children = []
        if children is not None:
            found_children.extend(children)
        else:
            if visible_children is not None:
                found_children.extend(visible_children)

        result = []
        if self.max_depth is None or self.max_depth > 0:
            for child in found_children:
                child = UIElement(
                    child,
                    offset_x,
                    offset_y,
                    self.max_depth - 1 if self.max_depth is not None else None,
                    self.visible_bbox,
                )
                result.append(child)
        return result

    def calculate_hashes(self):
        """Calculate unique identifiers for the element and its content."""
        self.identifier = self.component_hash()
        self.content_identifier = self.children_content_hash(self.children)

    def component_hash(self):
        """Generate a hash identifier for this component based on its properties.

        Returns:
            MD5 hash string of component properties
        """
        if self.position is None or self.size is None:
            return ""
        position_string = f"{self.position.x:.0f};{self.position.y:.0f}"
        size_string = f"{self.size.width:.0f};{self.size.height:.0f}"
        enabled_string = str(self.enabled)
        # Ensure role is a string
        role_string = ""
        if self.role is not None:
            role_string = str(self.role[0]) if isinstance(self.role, tuple) else str(self.role)
        return self.hash_from_string(position_string + size_string + enabled_string + role_string)

    def hash_from_string(self, string):
        """Generate MD5 hash from a string.

        Args:
            string: Input string to hash

        Returns:
            MD5 hash hexdigest or empty string if input is None/empty
        """
        if string is None or string == "":
            return ""
        from hashlib import md5

        return md5(string.encode()).hexdigest()

    def children_content_hash(self, children):
        """Generate a hash representing the content and structure of child elements.

        Args:
            children: List of child UIElement objects

        Returns:
            Combined hash of children content and structure
        """
        if len(children) == 0:
            return ""
        all_content_hashes = []
        all_hashes = []
        for child in children:
            all_content_hashes.append(child.content_identifier)
            all_hashes.append(child.identifier)
        all_content_hashes.sort()
        if len(all_content_hashes) == 0:
            return ""
        content_hash = self.hash_from_string("".join(all_content_hashes))
        content_structure_hash = self.hash_from_string("".join(all_hashes))
        return self.hash_from_string(content_hash.join(content_structure_hash))

    def to_dict(self):
        """Convert the UIElement to a dictionary representation.

        Returns:
            Dictionary containing all element properties and children
        """

        def children_to_dict(children):
            """Convert list of children to dictionary format.

            Args:
                children: List of UIElement children to convert

            Returns:
                List of dictionaries representing the children
            """
            result = []
            for child in children:
                result.append(child.to_dict())
            return result

        value = self.value
        if isinstance(value, UIElement):
            value = json.dumps(value.to_dict(), indent=4)
        elif isinstance(value, AppKit.NSDate):  # type: ignore
            value = str(value)

        if self.absolute_position is not None:
            absolute_position = f"{self.absolute_position.x:.2f};{self.absolute_position.y:.2f}"
        else:
            absolute_position = ""

        if self.position is not None:
            position = f"{self.position.x:.2f};{self.position.y:.2f}"
        else:
            position = ""

        if self.size is not None:
            size = f"{self.size.width:.0f};{self.size.height:.0f}"
        else:
            size = ""

        return {
            "id": self.identifier,
            "name": self.name,
            "role": self.role,
            "description": self.description,
            "role_description": self.role_description,
            "value": value,
            "absolute_position": absolute_position,
            "position": position,
            "size": size,
            "enabled": self.enabled,
            "bbox": self.bbox,
            "visible_bbox": self.visible_bbox,
            "children": children_to_dict(self.children),
        }


from pathlib import Path

import Quartz
from AppKit import NSRunningApplication, NSWorkspace


def get_all_windows_zorder():
    """Get all windows in the system with their z-order information.

    Returns:
        List of window dictionaries sorted by z-index, containing window properties
        like id, name, pid, owner, bounds, layer, and opacity
    """
    window_list = Quartz.CGWindowListCopyWindowInfo(
        Quartz.kCGWindowListOptionOnScreenOnly, Quartz.kCGNullWindowID
    )
    z_order = {
        window["kCGWindowNumber"]: z_index for z_index, window in enumerate(window_list[::-1])
    }
    window_list_all = Quartz.CGWindowListCopyWindowInfo(
        Quartz.kCGWindowListOptionAll, Quartz.kCGNullWindowID
    )
    windows = []
    for window in window_list_all:
        window_id = window.get("kCGWindowNumber", 0)
        window_name = window.get("kCGWindowName", "")
        window_pid = window.get("kCGWindowOwnerPID", 0)
        window_bounds = window.get("kCGWindowBounds", {})
        window_owner = window.get("kCGWindowOwnerName", "")
        window_is_on_screen = window.get("kCGWindowIsOnscreen", False)
        layer = window.get("kCGWindowLayer", 0)
        opacity = window.get("kCGWindowAlpha", 1.0)
        z_index = z_order.get(window_id, -1)
        if window_name == "Dock" and window_owner == "Dock":
            role = "dock"
        elif window_name == "Menubar" and window_owner == "Window Server":
            role = "menubar"
        elif window_owner in ["Window Server", "Dock"]:
            role = "desktop"
        else:
            role = "app"
        if window_bounds:
            windows.append(
                {
                    "id": window_id,
                    "name": window_name or "Unnamed Window",
                    "pid": window_pid,
                    "owner": window_owner,
                    "role": role,
                    "is_on_screen": window_is_on_screen,
                    "bounds": {
                        "x": window_bounds.get("X", 0),
                        "y": window_bounds.get("Y", 0),
                        "width": window_bounds.get("Width", 0),
                        "height": window_bounds.get("Height", 0),
                    },
                    "layer": layer,
                    "z_index": z_index,
                    "opacity": opacity,
                }
            )
    windows = sorted(windows, key=lambda x: x["z_index"])
    return windows


def get_app_info(app):
    """Extract information from an NSRunningApplication object.

    Args:
        app: NSRunningApplication instance

    Returns:
        Dictionary containing app name, bundle ID, PID, and status flags
    """
    return {
        "name": app.localizedName(),
        "bundle_id": app.bundleIdentifier(),
        "pid": app.processIdentifier(),
        "active": app.isActive(),
        "hidden": app.isHidden(),
        "terminated": app.isTerminated(),
    }


def get_menubar_items(active_app_pid=None):
    """Get menubar items for the active application.

    Args:
        active_app_pid: Process ID of the active application, or None to use frontmost app

    Returns:
        List of menubar item dictionaries with title, bounds, index, and app_pid
    """
    menubar_items = []
    if active_app_pid is None:
        frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
        if frontmost_app:
            active_app_pid = frontmost_app.processIdentifier()
        else:
            return menubar_items
    app_element = AXUIElementCreateApplication(active_app_pid)
    if app_element is None:
        return menubar_items
    menubar = element_attribute(app_element, kAXMenuBarAttribute)
    if menubar is None:
        return menubar_items
    children = element_attribute(menubar, kAXChildrenAttribute)
    if children is None:
        return menubar_items
    for i, item in enumerate(children):
        title = element_attribute(item, kAXTitleAttribute) or "Untitled"
        bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
        position_value = element_attribute(item, kAXPositionAttribute)
        if position_value:
            position_value = element_value(position_value, kAXValueCGPointType)
            bounds["x"] = getattr(position_value, "x", 0)
            bounds["y"] = getattr(position_value, "y", 0)
        size_value = element_attribute(item, kAXSizeAttribute)
        if size_value:
            size_value = element_value(size_value, kAXValueCGSizeType)
            bounds["width"] = getattr(size_value, "width", 0)
            bounds["height"] = getattr(size_value, "height", 0)
        menubar_items.append(
            {"title": title, "bounds": bounds, "index": i, "app_pid": active_app_pid}
        )
    return menubar_items


def get_dock_items():
    """Get all items in the macOS Dock.

    Returns:
        List of dock item dictionaries with title, description, bounds, index,
        type, role, and subrole information
    """
    dock_items = []
    dock_pid = None
    running_apps = NSWorkspace.sharedWorkspace().runningApplications()
    for app in running_apps:
        if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
            dock_pid = app.processIdentifier()
            break
    if dock_pid is None:
        return dock_items
    dock_element = AXUIElementCreateApplication(dock_pid)
    if dock_element is None:
        return dock_items
    dock_list = element_attribute(dock_element, kAXChildrenAttribute)
    if dock_list is None or len(dock_list) == 0:
        return dock_items
    dock_app_list = None
    for child in dock_list:
        role = element_attribute(child, kAXRoleAttribute)
        if role == "AXList":
            dock_app_list = child
            break
    if dock_app_list is None:
        return dock_items
    items = element_attribute(dock_app_list, kAXChildrenAttribute)
    if items is None:
        return dock_items
    for i, item in enumerate(items):
        title = element_attribute(item, kAXTitleAttribute) or "Untitled"
        description = element_attribute(item, kAXDescriptionAttribute) or ""
        role = element_attribute(item, kAXRoleAttribute) or ""
        subrole = element_attribute(item, "AXSubrole") or ""
        bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
        position_value = element_attribute(item, kAXPositionAttribute)
        if position_value:
            position_value = element_value(position_value, kAXValueCGPointType)
            bounds["x"] = getattr(position_value, "x", 0)
            bounds["y"] = getattr(position_value, "y", 0)
        size_value = element_attribute(item, kAXSizeAttribute)
        if size_value:
            size_value = element_value(size_value, kAXValueCGSizeType)
            bounds["width"] = getattr(size_value, "width", 0)
            bounds["height"] = getattr(size_value, "height", 0)
        item_type = "unknown"
        if subrole == "AXApplicationDockItem":
            item_type = "application"
        elif subrole == "AXFolderDockItem":
            item_type = "folder"
        elif subrole == "AXDocumentDockItem":
            item_type = "document"
        elif subrole == "AXSeparatorDockItem" or role == "AXSeparator":
            item_type = "separator"
        elif "trash" in title.lower():
            item_type = "trash"
        dock_items.append(
            {
                "title": title,
                "description": description,
                "bounds": bounds,
                "index": i,
                "type": item_type,
                "role": role,
                "subrole": subrole,
            }
        )
    return dock_items


class MacOSAccessibilityHandler(BaseAccessibilityHandler):
    """Handler for macOS accessibility features and UI element inspection."""

    def get_desktop_state(self):
        """Get the current state of the desktop including windows, apps, menubar, and dock.

        Returns:
            Dictionary containing applications, windows, menubar_items, and dock_items
        """
        windows = [w for w in get_all_windows_zorder() if w.get("is_on_screen")]
        running_apps = self.get_running_apps()
        applications = []
        pid_to_window_ids = {}
        # Build a mapping: pid -> list of AX window trees
        pid_to_ax_trees = {}
        for app in running_apps:
            pid = app.processIdentifier()
            try:
                app_elem = AXUIElementCreateApplication(pid)
                err, app_windows = AXUIElementCopyAttributeValue(
                    app_elem, kAXWindowsAttribute, None
                )
                trees = []
                if err == kAXErrorSuccess and app_windows:
                    for ax_win in app_windows:
                        try:
                            trees.append(UIElement(ax_win).to_dict())
                        except Exception as e:
                            trees.append({"error": str(e)})
                pid_to_ax_trees[pid] = trees
            except Exception as e:
                pid_to_ax_trees[pid] = [{"error": str(e)}]
        # Attach children by pid and index (order)
        pid_to_idx = {}
        for win in windows:
            pid = win["pid"]
            idx = pid_to_idx.get(pid, 0)
            ax_trees = pid_to_ax_trees.get(pid, [])
            win["children"] = (
                ax_trees[idx]["children"]
                if idx < len(ax_trees) and "children" in ax_trees[idx]
                else []
            )
            pid_to_idx[pid] = idx + 1
            pid_to_window_ids.setdefault(pid, []).append(win["id"])
        for app in running_apps:
            info = get_app_info(app)
            app_pid = info["pid"]
            applications.append({"info": info, "windows": pid_to_window_ids.get(app_pid, [])})
        menubar_items = get_menubar_items()
        dock_items = get_dock_items()
        return {
            "applications": applications,
            "windows": windows,
            "menubar_items": menubar_items,
            "dock_items": dock_items,
        }

    def get_application_windows(self, pid: int):
        """Get all windows for a specific application.

        Args:
            pid: Process ID of the application

        Returns:
            List of accessibility window elements or empty list if none found
        """
        try:
            app = AXUIElementCreateApplication(pid)
            err, windows = AXUIElementCopyAttributeValue(app, kAXWindowsAttribute, None)
            if err == kAXErrorSuccess and windows:
                if isinstance(windows, Foundation.NSArray):  # type: ignore
                    return windows
            return []
        except:
            return []

    def get_all_windows(self):
        """Get all visible windows in the system.

        Returns:
            List of window dictionaries with app information and window details
        """
        try:
            windows = []
            running_apps = self.get_running_apps()

            for app in running_apps:
                try:
                    app_name = app.localizedName()
                    pid = app.processIdentifier()

                    # Skip system processes and background apps
                    if not app.activationPolicy() == 0:  # NSApplicationActivationPolicyRegular
                        continue

                    # Get application windows
                    app_windows = self.get_application_windows(pid)

                    windows.append(
                        {
                            "app_name": app_name,
                            "pid": pid,
                            "frontmost": app.isActive(),
                            "has_windows": len(app_windows) > 0,
                            "windows": app_windows,
                        }
                    )
                except:
                    continue

            return windows
        except:
            return []

    def get_running_apps(self):
        """Get all currently running applications.

        Returns:
            List of NSRunningApplication objects
        """
        # From NSWorkspace.runningApplications docs: https://developer.apple.com/documentation/appkit/nsworkspace/runningapplications
        # "Similar to the NSRunningApplication class's properties, this property will only change when the main run loop runs in a common mode"
        # So we need to run the main run loop to get the latest running applications
        Foundation.CFRunLoopRunInMode(Foundation.kCFRunLoopDefaultMode, 0.1, False)  # type: ignore
        return NSWorkspace.sharedWorkspace().runningApplications()

    def get_ax_attribute(self, element, attribute):
        """Get an accessibility attribute from an element.

        Args:
            element: The accessibility element
            attribute: The attribute name to retrieve

        Returns:
            The attribute value or None if not found
        """
        return element_attribute(element, attribute)

    def serialize_node(self, element):
        """Create a serializable dictionary representation of an accessibility element.

        Args:
            element: The accessibility element to serialize

        Returns:
            Dictionary containing element properties like role, title, value, position, and size
        """
        # Create a serializable dictionary representation of an accessibility element
        result = {}

        # Get basic attributes
        result["role"] = self.get_ax_attribute(element, kAXRoleAttribute)
        result["title"] = self.get_ax_attribute(element, kAXTitleAttribute)
        result["value"] = self.get_ax_attribute(element, kAXValueAttribute)

        # Get position and size if available
        position = self.get_ax_attribute(element, kAXPositionAttribute)
        if position:
            try:
                position_dict = {"x": position[0], "y": position[1]}
                result["position"] = position_dict
            except (IndexError, TypeError):
                pass

        size = self.get_ax_attribute(element, kAXSizeAttribute)
        if size:
            try:
                size_dict = {"width": size[0], "height": size[1]}
                result["size"] = size_dict
            except (IndexError, TypeError):
                pass

        return result

    async def get_accessibility_tree(self) -> Dict[str, Any]:
        """Get the complete accessibility tree for the current desktop state.

        Returns:
            Dictionary containing success status and desktop state information
        """
        try:
            desktop_state = self.get_desktop_state()
            return {"success": True, **desktop_state}

        except Exception as e:
            return {"success": False, "error": str(e)}

    async def find_element(
        self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None
    ) -> Dict[str, Any]:
        """Find an accessibility element matching the specified criteria.

        Args:
            role: The accessibility role to match (optional)
            title: The title to match (optional)
            value: The value to match (optional)

        Returns:
            Dictionary containing success status and the found element or error message
        """
        try:
            system = AXUIElementCreateSystemWide()

            def match_element(element):
                """Check if an element matches the search criteria.

                Args:
                    element: The accessibility element to check

                Returns:
                    True if element matches all specified criteria, False otherwise
                """
                if role and self.get_ax_attribute(element, kAXRoleAttribute) != role:
                    return False
                if title and self.get_ax_attribute(element, kAXTitleAttribute) != title:
                    return False
                if value and str(self.get_ax_attribute(element, kAXValueAttribute)) != value:
                    return False
                return True

            def search_tree(element):
                """Recursively search the accessibility tree for matching elements.

                Args:
                    element: The accessibility element to search from

                Returns:
                    Serialized element dictionary if match found, None otherwise
                """
                if match_element(element):
                    return self.serialize_node(element)

                children = self.get_ax_attribute(element, kAXChildrenAttribute)
                if children:
                    for child in children:
                        result = search_tree(child)
                        if result:
                            return result
                return None

            element = search_tree(system)
            return {"success": True, "element": element}

        except Exception as e:
            return {"success": False, "error": str(e)}


class MacOSAutomationHandler(BaseAutomationHandler):
    """Handler for macOS automation including mouse, keyboard, and screen operations."""

    # Mouse Actions
    mouse = MouseController()
    keyboard = KeyboardController()

    async def mouse_down(
        self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left"
    ) -> Dict[str, Any]:
        """Press and hold a mouse button at the specified coordinates.

        Args:
            x: X coordinate (optional, uses current position if None)
            y: Y coordinate (optional, uses current position if None)
            button: Mouse button to press ("left", "right", or "middle")

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            if x is not None and y is not None:
                self.mouse.position = (x, y)
            self.mouse.press(
                Button.left
                if button == "left"
                else Button.right if button == "right" else Button.middle
            )
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def mouse_up(
        self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left"
    ) -> Dict[str, Any]:
        """Release a mouse button at the specified coordinates.

        Args:
            x: X coordinate (optional, uses current position if None)
            y: Y coordinate (optional, uses current position if None)
            button: Mouse button to release ("left", "right", or "middle")

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            if x is not None and y is not None:
                self.mouse.position = (x, y)
            self.mouse.release(
                Button.left
                if button == "left"
                else Button.right if button == "right" else Button.middle
            )
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
        """Perform a left mouse click at the specified coordinates.

        Args:
            x: X coordinate (optional, uses current position if None)
            y: Y coordinate (optional, uses current position if None)

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            if x is not None and y is not None:
                self.mouse.position = (x, y)
            self.mouse.click(Button.left, 1)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
        """Perform a right mouse click at the specified coordinates.

        Args:
            x: X coordinate (optional, uses current position if None)
            y: Y coordinate (optional, uses current position if None)

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            if x is not None and y is not None:
                self.mouse.position = (x, y)
            self.mouse.click(Button.right, 1)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def double_click(
        self, x: Optional[int] = None, y: Optional[int] = None
    ) -> Dict[str, Any]:
        """Perform a double left mouse click at the specified coordinates.

        Args:
            x: X coordinate (optional, uses current position if None)
            y: Y coordinate (optional, uses current position if None)

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            if x is not None and y is not None:
                self.mouse.position = (x, y)
            self.mouse.click(Button.left, 2)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
        """Move the mouse cursor to the specified coordinates.

        Args:
            x: Target X coordinate
            y: Target Y coordinate

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            self.mouse.position = (x, y)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def drag_to(
        self, x: int, y: int, button: str = "left", duration: float = 0.5
    ) -> Dict[str, Any]:
        """Drag from current position to target coordinates.

        Args:
            x: Target X coordinate
            y: Target Y coordinate
            button: Mouse button to use for dragging ("left", "right", or "middle")
            duration: Duration of the drag operation in seconds

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            btn = (
                Button.left
                if button == "left"
                else Button.right if button == "right" else Button.middle
            )
            # Press
            self.mouse.press(btn)
            # Move with sleep to simulate drag duration
            start = self.mouse.position
            steps = 20
            start_x, start_y = start
            dx = (x - start_x) / steps
            dy = (y - start_y) / steps
            for i in range(steps):
                self.mouse.position = (int(start_x + dx * (i + 1)), int(start_y + dy * (i + 1)))
                time.sleep(duration / steps)
            # Release
            self.mouse.release(btn)
            return {"success": True}
        except Exception as e:
            try:
                self.mouse.release(btn)
            except:
                pass
            return {"success": False, "error": str(e)}

    async def drag(
        self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5
    ) -> Dict[str, Any]:
        """Drag the mouse along a specified path of coordinates.

        Args:
            path: List of (x, y) coordinate tuples defining the drag path
            button: Mouse button to use for dragging ("left", "right", or "middle")
            duration: Total duration of the drag operation in seconds

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            if not path or len(path) < 2:
                return {"success": False, "error": "Path must contain at least 2 points"}
            btn = (
                Button.left
                if button == "left"
                else Button.right if button == "right" else Button.middle
            )
            # Move to the first point
            self.mouse.position = path[0]
            self.mouse.press(btn)
            step_duration = duration / (len(path) - 1) if len(path) > 1 else duration
            for x, y in path[1:]:
                self.mouse.position = (x, y)
                time.sleep(step_duration)
            self.mouse.release(btn)
            return {"success": True}
        except Exception as e:
            try:
                self.mouse.release(btn)
            except:
                pass
            return {"success": False, "error": str(e)}

    # Keyboard Actions
    async def key_down(self, key: str) -> Dict[str, Any]:
        """Press and hold a keyboard key.

        Args:
            key: Key name to press (using pyautogui key names)

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            # use pyautogui for their key names
            pyautogui.keyDown(key)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def key_up(self, key: str) -> Dict[str, Any]:
        """Release a keyboard key.

        Args:
            key: Key name to release (using pyautogui key names)

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            # use pyautogui for their key names
            pyautogui.keyUp(key)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def type_text(self, text: str) -> Dict[str, Any]:
        """Type text using the keyboard with Unicode support.

        Args:
            text: Text string to type

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            # use pynput for Unicode support
            self.keyboard.type(text)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def press_key(self, key: str) -> Dict[str, Any]:
        """Press and release a keyboard key.

        Args:
            key: Key name to press (using pyautogui key names)

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            # use pyautogui for their key names
            pyautogui.press(key)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
        """Press a combination of keys simultaneously.

        Args:
            keys: List of key names to press together (using pyautogui key names)

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            # use pyautogui for their key names
            pyautogui.hotkey(*keys)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Scrolling Actions
    async def scroll(self, x: int, y: int) -> Dict[str, Any]:
        """Scroll the mouse wheel in the specified direction.

        Args:
            x: Horizontal scroll amount
            y: Vertical scroll amount (positive for up, negative for down)

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            self.mouse.scroll(x, y)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
        """Scroll down by the specified number of clicks.

        Args:
            clicks: Number of scroll clicks to perform

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            self.mouse.scroll(0, -clicks)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
        """Scroll up by the specified number of clicks.

        Args:
            clicks: Number of scroll clicks to perform

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            self.mouse.scroll(0, clicks)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Screen Actions
    async def screenshot(self) -> Dict[str, Any]:
        """Capture a screenshot of the current screen.

        Returns:
            Dictionary containing success status and base64-encoded image data or error message
        """
        try:
            from PIL import Image

            screenshot = pyautogui.screenshot()
            if not isinstance(screenshot, Image.Image):
                return {"success": False, "error": "Failed to capture screenshot"}

            # Resize image to reduce size (max width 1920, maintain aspect ratio)
            max_width = 1920
            if screenshot.width > max_width:
                ratio = max_width / screenshot.width
                new_height = int(screenshot.height * ratio)
                screenshot = screenshot.resize((max_width, new_height), Image.Resampling.LANCZOS)

            buffered = BytesIO()
            # Use PNG format with optimization to reduce file size
            screenshot.save(buffered, format="PNG", optimize=True)
            buffered.seek(0)
            image_data = base64.b64encode(buffered.getvalue()).decode()
            return {"success": True, "image_data": image_data}
        except Exception as e:
            return {"success": False, "error": f"Screenshot error: {str(e)}"}

    async def get_screen_size(self) -> Dict[str, Any]:
        """Get the dimensions of the current screen.

        Returns:
            Dictionary containing success status and screen size or error message
        """
        try:
            size = pyautogui.size()
            return {"success": True, "size": {"width": size.width, "height": size.height}}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def get_cursor_position(self) -> Dict[str, Any]:
        """Get the current position of the mouse cursor.

        Returns:
            Dictionary containing success status and cursor position or error message
        """
        try:
            x, y = self.mouse.position
            return {"success": True, "position": {"x": x, "y": y}}
        except Exception as e:
            return {"success": False, "error": str(e)}

    # Clipboard Actions
    async def copy_to_clipboard(self) -> Dict[str, Any]:
        """Get the current content of the system clipboard.

        Returns:
            Dictionary containing success status and clipboard content or error message
        """
        try:
            import pyperclip

            content = pyperclip.paste()
            return {"success": True, "content": content}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def set_clipboard(self, text: str) -> Dict[str, Any]:
        """Set the content of the system clipboard.

        Args:
            text: Text to copy to the clipboard

        Returns:
            Dictionary containing success status and error message if failed
        """
        try:
            import pyperclip

            pyperclip.copy(text)
            return {"success": True}
        except Exception as e:
            return {"success": False, "error": str(e)}

    async def run_command(self, command: str) -> Dict[str, Any]:
        """Run a shell command and return its output.

        Args:
            command: Shell command to execute

        Returns:
            Dictionary containing success status, stdout, stderr, and return code
        """
        try:
            # Create subprocess
            process = await asyncio.create_subprocess_shell(
                command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
            )
            # Wait for the subprocess to finish
            stdout, stderr = await process.communicate()
            # Return decoded output
            return {
                "success": True,
                "stdout": stdout.decode() if stdout else "",
                "stderr": stderr.decode() if stderr else "",
                "return_code": process.returncode,
            }
        except Exception as e:
            return {"success": False, "error": str(e)}

```
Page 18/20FirstPrevNextLast