This is page 18 of 20. Use http://codebase.md/trycua/cua?page={x} to view the full context.
# Directory Structure
```
├── .cursorignore
├── .dockerignore
├── .editorconfig
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── scripts
│ │ ├── get_pyproject_version.py
│ │ └── tests
│ │ ├── __init__.py
│ │ ├── README.md
│ │ └── test_get_pyproject_version.py
│ └── workflows
│ ├── bump-version.yml
│ ├── ci-lume.yml
│ ├── docker-publish-cua-linux.yml
│ ├── docker-publish-cua-windows.yml
│ ├── docker-publish-kasm.yml
│ ├── docker-publish-xfce.yml
│ ├── docker-reusable-publish.yml
│ ├── link-check.yml
│ ├── lint.yml
│ ├── npm-publish-cli.yml
│ ├── npm-publish-computer.yml
│ ├── npm-publish-core.yml
│ ├── publish-lume.yml
│ ├── pypi-publish-agent.yml
│ ├── pypi-publish-computer-server.yml
│ ├── pypi-publish-computer.yml
│ ├── pypi-publish-core.yml
│ ├── pypi-publish-mcp-server.yml
│ ├── pypi-publish-som.yml
│ ├── pypi-reusable-publish.yml
│ ├── python-tests.yml
│ ├── test-cua-models.yml
│ └── test-validation-script.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .prettierignore
├── .prettierrc.yaml
├── .vscode
│ ├── docs.code-workspace
│ ├── extensions.json
│ ├── launch.json
│ ├── libs-ts.code-workspace
│ ├── lume.code-workspace
│ ├── lumier.code-workspace
│ ├── py.code-workspace
│ └── settings.json
├── blog
│ ├── app-use.md
│ ├── assets
│ │ ├── composite-agents.png
│ │ ├── docker-ubuntu-support.png
│ │ ├── hack-booth.png
│ │ ├── hack-closing-ceremony.jpg
│ │ ├── hack-cua-ollama-hud.jpeg
│ │ ├── hack-leaderboard.png
│ │ ├── hack-the-north.png
│ │ ├── hack-winners.jpeg
│ │ ├── hack-workshop.jpeg
│ │ ├── hud-agent-evals.png
│ │ └── trajectory-viewer.jpeg
│ ├── bringing-computer-use-to-the-web.md
│ ├── build-your-own-operator-on-macos-1.md
│ ├── build-your-own-operator-on-macos-2.md
│ ├── cloud-windows-ga-macos-preview.md
│ ├── composite-agents.md
│ ├── computer-use-agents-for-growth-hacking.md
│ ├── cua-hackathon.md
│ ├── cua-playground-preview.md
│ ├── cua-vlm-router.md
│ ├── hack-the-north.md
│ ├── hud-agent-evals.md
│ ├── human-in-the-loop.md
│ ├── introducing-cua-cli.md
│ ├── introducing-cua-cloud-containers.md
│ ├── lume-to-containerization.md
│ ├── neurips-2025-cua-papers.md
│ ├── sandboxed-python-execution.md
│ ├── training-computer-use-models-trajectories-1.md
│ ├── trajectory-viewer.md
│ ├── ubuntu-docker-support.md
│ └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│ ├── .env.example
│ ├── .gitignore
│ ├── content
│ │ └── docs
│ │ ├── agent-sdk
│ │ │ ├── agent-loops.mdx
│ │ │ ├── benchmarks
│ │ │ │ ├── index.mdx
│ │ │ │ ├── interactive.mdx
│ │ │ │ ├── introduction.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── osworld-verified.mdx
│ │ │ │ ├── screenspot-pro.mdx
│ │ │ │ └── screenspot-v2.mdx
│ │ │ ├── callbacks
│ │ │ │ ├── agent-lifecycle.mdx
│ │ │ │ ├── cost-saving.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── logging.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── pii-anonymization.mdx
│ │ │ │ └── trajectories.mdx
│ │ │ ├── chat-history.mdx
│ │ │ ├── custom-tools.mdx
│ │ │ ├── customizing-computeragent.mdx
│ │ │ ├── integrations
│ │ │ │ ├── hud.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── observability.mdx
│ │ │ ├── mcp-server
│ │ │ │ ├── client-integrations.mdx
│ │ │ │ ├── configuration.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── llm-integrations.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── tools.mdx
│ │ │ │ └── usage.mdx
│ │ │ ├── message-format.mdx
│ │ │ ├── meta.json
│ │ │ ├── migration-guide.mdx
│ │ │ ├── prompt-caching.mdx
│ │ │ ├── supported-agents
│ │ │ │ ├── composed-agents.mdx
│ │ │ │ ├── computer-use-agents.mdx
│ │ │ │ ├── grounding-models.mdx
│ │ │ │ ├── human-in-the-loop.mdx
│ │ │ │ └── meta.json
│ │ │ ├── supported-model-providers
│ │ │ │ ├── cua-vlm-router.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ └── local-models.mdx
│ │ │ ├── telemetry.mdx
│ │ │ └── usage-tracking.mdx
│ │ ├── cli-playbook
│ │ │ ├── commands.mdx
│ │ │ ├── index.mdx
│ │ │ └── meta.json
│ │ ├── computer-sdk
│ │ │ ├── cloud-vm-management.mdx
│ │ │ ├── commands.mdx
│ │ │ ├── computer-server
│ │ │ │ ├── Commands.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── meta.json
│ │ │ │ ├── REST-API.mdx
│ │ │ │ └── WebSocket-API.mdx
│ │ │ ├── computer-ui.mdx
│ │ │ ├── computers.mdx
│ │ │ ├── custom-computer-handlers.mdx
│ │ │ ├── meta.json
│ │ │ ├── sandboxed-python.mdx
│ │ │ └── tracing-api.mdx
│ │ ├── example-usecases
│ │ │ ├── form-filling.mdx
│ │ │ ├── gemini-complex-ui-navigation.mdx
│ │ │ ├── meta.json
│ │ │ ├── post-event-contact-export.mdx
│ │ │ └── windows-app-behind-vpn.mdx
│ │ ├── get-started
│ │ │ ├── meta.json
│ │ │ └── quickstart.mdx
│ │ ├── index.mdx
│ │ ├── macos-vm-cli-playbook
│ │ │ ├── lume
│ │ │ │ ├── cli-reference.mdx
│ │ │ │ ├── faq.md
│ │ │ │ ├── http-api.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ ├── meta.json
│ │ │ │ └── prebuilt-images.mdx
│ │ │ ├── lumier
│ │ │ │ ├── building-lumier.mdx
│ │ │ │ ├── docker-compose.mdx
│ │ │ │ ├── docker.mdx
│ │ │ │ ├── index.mdx
│ │ │ │ ├── installation.mdx
│ │ │ │ └── meta.json
│ │ │ └── meta.json
│ │ └── meta.json
│ ├── next.config.mjs
│ ├── package-lock.json
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── postcss.config.mjs
│ ├── public
│ │ └── img
│ │ ├── agent_gradio_ui.png
│ │ ├── agent.png
│ │ ├── bg-dark.jpg
│ │ ├── bg-light.jpg
│ │ ├── cli.png
│ │ ├── computer.png
│ │ ├── grounding-with-gemini3.gif
│ │ ├── hero.png
│ │ ├── laminar_trace_example.png
│ │ ├── som_box_threshold.png
│ │ └── som_iou_threshold.png
│ ├── README.md
│ ├── source.config.ts
│ ├── src
│ │ ├── app
│ │ │ ├── (home)
│ │ │ │ ├── [[...slug]]
│ │ │ │ │ └── page.tsx
│ │ │ │ └── layout.tsx
│ │ │ ├── api
│ │ │ │ ├── posthog
│ │ │ │ │ └── [...path]
│ │ │ │ │ └── route.ts
│ │ │ │ └── search
│ │ │ │ └── route.ts
│ │ │ ├── favicon.ico
│ │ │ ├── global.css
│ │ │ ├── layout.config.tsx
│ │ │ ├── layout.tsx
│ │ │ ├── llms.mdx
│ │ │ │ └── [[...slug]]
│ │ │ │ └── route.ts
│ │ │ ├── llms.txt
│ │ │ │ └── route.ts
│ │ │ ├── robots.ts
│ │ │ └── sitemap.ts
│ │ ├── assets
│ │ │ ├── discord-black.svg
│ │ │ ├── discord-white.svg
│ │ │ ├── logo-black.svg
│ │ │ └── logo-white.svg
│ │ ├── components
│ │ │ ├── analytics-tracker.tsx
│ │ │ ├── cookie-consent.tsx
│ │ │ ├── doc-actions-menu.tsx
│ │ │ ├── editable-code-block.tsx
│ │ │ ├── footer.tsx
│ │ │ ├── hero.tsx
│ │ │ ├── iou.tsx
│ │ │ ├── mermaid.tsx
│ │ │ └── page-feedback.tsx
│ │ ├── lib
│ │ │ ├── llms.ts
│ │ │ └── source.ts
│ │ ├── mdx-components.tsx
│ │ └── providers
│ │ └── posthog-provider.tsx
│ └── tsconfig.json
├── examples
│ ├── agent_examples.py
│ ├── agent_ui_examples.py
│ ├── browser_tool_example.py
│ ├── cloud_api_examples.py
│ ├── computer_examples_windows.py
│ ├── computer_examples.py
│ ├── computer_ui_examples.py
│ ├── computer-example-ts
│ │ ├── .env.example
│ │ ├── .gitignore
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── helpers.ts
│ │ │ └── index.ts
│ │ └── tsconfig.json
│ ├── docker_examples.py
│ ├── evals
│ │ ├── hud_eval_examples.py
│ │ └── wikipedia_most_linked.txt
│ ├── pylume_examples.py
│ ├── sandboxed_functions_examples.py
│ ├── som_examples.py
│ ├── tracing_examples.py
│ ├── utils.py
│ └── winsandbox_example.py
├── img
│ ├── agent_gradio_ui.png
│ ├── agent.png
│ ├── cli.png
│ ├── computer.png
│ ├── logo_black.png
│ └── logo_white.png
├── libs
│ ├── kasm
│ │ ├── Dockerfile
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── src
│ │ └── ubuntu
│ │ └── install
│ │ └── firefox
│ │ ├── custom_startup.sh
│ │ ├── firefox.desktop
│ │ └── install_firefox.sh
│ ├── lume
│ │ ├── .cursorignore
│ │ ├── CONTRIBUTING.md
│ │ ├── Development.md
│ │ ├── img
│ │ │ └── cli.png
│ │ ├── Package.resolved
│ │ ├── Package.swift
│ │ ├── README.md
│ │ ├── resources
│ │ │ └── lume.entitlements
│ │ ├── scripts
│ │ │ ├── build
│ │ │ │ ├── build-debug.sh
│ │ │ │ ├── build-release-notarized.sh
│ │ │ │ └── build-release.sh
│ │ │ └── install.sh
│ │ ├── src
│ │ │ ├── Commands
│ │ │ │ ├── Clone.swift
│ │ │ │ ├── Config.swift
│ │ │ │ ├── Create.swift
│ │ │ │ ├── Delete.swift
│ │ │ │ ├── Get.swift
│ │ │ │ ├── Images.swift
│ │ │ │ ├── IPSW.swift
│ │ │ │ ├── List.swift
│ │ │ │ ├── Logs.swift
│ │ │ │ ├── Options
│ │ │ │ │ └── FormatOption.swift
│ │ │ │ ├── Prune.swift
│ │ │ │ ├── Pull.swift
│ │ │ │ ├── Push.swift
│ │ │ │ ├── Run.swift
│ │ │ │ ├── Serve.swift
│ │ │ │ ├── Set.swift
│ │ │ │ └── Stop.swift
│ │ │ ├── ContainerRegistry
│ │ │ │ ├── ImageContainerRegistry.swift
│ │ │ │ ├── ImageList.swift
│ │ │ │ └── ImagesPrinter.swift
│ │ │ ├── Errors
│ │ │ │ └── Errors.swift
│ │ │ ├── FileSystem
│ │ │ │ ├── Home.swift
│ │ │ │ ├── Settings.swift
│ │ │ │ ├── VMConfig.swift
│ │ │ │ ├── VMDirectory.swift
│ │ │ │ └── VMLocation.swift
│ │ │ ├── LumeController.swift
│ │ │ ├── Main.swift
│ │ │ ├── Server
│ │ │ │ ├── Handlers.swift
│ │ │ │ ├── HTTP.swift
│ │ │ │ ├── Requests.swift
│ │ │ │ ├── Responses.swift
│ │ │ │ └── Server.swift
│ │ │ ├── Utils
│ │ │ │ ├── CommandRegistry.swift
│ │ │ │ ├── CommandUtils.swift
│ │ │ │ ├── Logger.swift
│ │ │ │ ├── NetworkUtils.swift
│ │ │ │ ├── Path.swift
│ │ │ │ ├── ProcessRunner.swift
│ │ │ │ ├── ProgressLogger.swift
│ │ │ │ ├── String.swift
│ │ │ │ └── Utils.swift
│ │ │ ├── Virtualization
│ │ │ │ ├── DarwinImageLoader.swift
│ │ │ │ ├── DHCPLeaseParser.swift
│ │ │ │ ├── ImageLoaderFactory.swift
│ │ │ │ └── VMVirtualizationService.swift
│ │ │ ├── VM
│ │ │ │ ├── DarwinVM.swift
│ │ │ │ ├── LinuxVM.swift
│ │ │ │ ├── VM.swift
│ │ │ │ ├── VMDetails.swift
│ │ │ │ ├── VMDetailsPrinter.swift
│ │ │ │ ├── VMDisplayResolution.swift
│ │ │ │ └── VMFactory.swift
│ │ │ └── VNC
│ │ │ ├── PassphraseGenerator.swift
│ │ │ └── VNCService.swift
│ │ └── tests
│ │ ├── Mocks
│ │ │ ├── MockVM.swift
│ │ │ ├── MockVMVirtualizationService.swift
│ │ │ └── MockVNCService.swift
│ │ ├── VM
│ │ │ └── VMDetailsPrinterTests.swift
│ │ ├── VMTests.swift
│ │ ├── VMVirtualizationServiceTests.swift
│ │ └── VNCServiceTests.swift
│ ├── lumier
│ │ ├── .dockerignore
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── bin
│ │ │ └── entry.sh
│ │ ├── config
│ │ │ └── constants.sh
│ │ ├── hooks
│ │ │ └── on-logon.sh
│ │ └── lib
│ │ ├── utils.sh
│ │ └── vm.sh
│ ├── python
│ │ ├── agent
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── agent
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── adapters
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── cua_adapter.py
│ │ │ │ │ ├── huggingfacelocal_adapter.py
│ │ │ │ │ ├── human_adapter.py
│ │ │ │ │ ├── mlxvlm_adapter.py
│ │ │ │ │ └── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ └── qwen2_5_vl.py
│ │ │ │ ├── agent.py
│ │ │ │ ├── callbacks
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── budget_manager.py
│ │ │ │ │ ├── image_retention.py
│ │ │ │ │ ├── logging.py
│ │ │ │ │ ├── operator_validator.py
│ │ │ │ │ ├── pii_anonymization.py
│ │ │ │ │ ├── prompt_instructions.py
│ │ │ │ │ ├── telemetry.py
│ │ │ │ │ └── trajectory_saver.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── computers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cua.py
│ │ │ │ │ └── custom.py
│ │ │ │ ├── decorators.py
│ │ │ │ ├── human_tool
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ ├── server.py
│ │ │ │ │ └── ui.py
│ │ │ │ ├── integrations
│ │ │ │ │ └── hud
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── agent.py
│ │ │ │ │ └── proxy.py
│ │ │ │ ├── loops
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── anthropic.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── composed_grounded.py
│ │ │ │ │ ├── gelato.py
│ │ │ │ │ ├── gemini.py
│ │ │ │ │ ├── generic_vlm.py
│ │ │ │ │ ├── glm45v.py
│ │ │ │ │ ├── gta1.py
│ │ │ │ │ ├── holo.py
│ │ │ │ │ ├── internvl.py
│ │ │ │ │ ├── model_types.csv
│ │ │ │ │ ├── moondream3.py
│ │ │ │ │ ├── omniparser.py
│ │ │ │ │ ├── openai.py
│ │ │ │ │ ├── opencua.py
│ │ │ │ │ ├── uiins.py
│ │ │ │ │ ├── uitars.py
│ │ │ │ │ └── uitars2.py
│ │ │ │ ├── proxy
│ │ │ │ │ ├── examples.py
│ │ │ │ │ └── handlers.py
│ │ │ │ ├── responses.py
│ │ │ │ ├── tools
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── browser_tool.py
│ │ │ │ ├── types.py
│ │ │ │ └── ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ └── gradio
│ │ │ │ ├── __init__.py
│ │ │ │ ├── app.py
│ │ │ │ └── ui_components.py
│ │ │ ├── benchmarks
│ │ │ │ ├── .gitignore
│ │ │ │ ├── contrib.md
│ │ │ │ ├── interactive.py
│ │ │ │ ├── models
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ └── gta1.py
│ │ │ │ ├── README.md
│ │ │ │ ├── ss-pro.py
│ │ │ │ ├── ss-v2.py
│ │ │ │ └── utils.py
│ │ │ ├── example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer_agent.py
│ │ ├── bench-ui
│ │ │ ├── bench_ui
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api.py
│ │ │ │ └── child.py
│ │ │ ├── examples
│ │ │ │ ├── folder_example.py
│ │ │ │ ├── gui
│ │ │ │ │ ├── index.html
│ │ │ │ │ ├── logo.svg
│ │ │ │ │ └── styles.css
│ │ │ │ ├── output_overlay.png
│ │ │ │ └── simple_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ └── test_port_detection.py
│ │ ├── computer
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer
│ │ │ │ ├── __init__.py
│ │ │ │ ├── computer.py
│ │ │ │ ├── diorama_computer.py
│ │ │ │ ├── helpers.py
│ │ │ │ ├── interface
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ ├── models.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── logger.py
│ │ │ │ ├── models.py
│ │ │ │ ├── providers
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── cloud
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── docker
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── lume
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── lume_api.py
│ │ │ │ │ ├── lumier
│ │ │ │ │ │ ├── __init__.py
│ │ │ │ │ │ └── provider.py
│ │ │ │ │ ├── types.py
│ │ │ │ │ └── winsandbox
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── provider.py
│ │ │ │ │ └── setup_script.ps1
│ │ │ │ ├── tracing_wrapper.py
│ │ │ │ ├── tracing.py
│ │ │ │ ├── ui
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── __main__.py
│ │ │ │ │ └── gradio
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── app.py
│ │ │ │ └── utils.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_computer.py
│ │ ├── computer-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── computer_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── browser.py
│ │ │ │ ├── cli.py
│ │ │ │ ├── diorama
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── diorama_computer.py
│ │ │ │ │ ├── diorama.py
│ │ │ │ │ ├── draw.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── safezone.py
│ │ │ │ ├── handlers
│ │ │ │ │ ├── base.py
│ │ │ │ │ ├── factory.py
│ │ │ │ │ ├── generic.py
│ │ │ │ │ ├── linux.py
│ │ │ │ │ ├── macos.py
│ │ │ │ │ └── windows.py
│ │ │ │ ├── main.py
│ │ │ │ ├── server.py
│ │ │ │ ├── utils
│ │ │ │ │ ├── __init__.py
│ │ │ │ │ └── wallpaper.py
│ │ │ │ └── watchdog.py
│ │ │ ├── examples
│ │ │ │ ├── __init__.py
│ │ │ │ └── usage_example.py
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ ├── run_server.py
│ │ │ ├── test_connection.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_server.py
│ │ ├── core
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── core
│ │ │ │ ├── __init__.py
│ │ │ │ └── telemetry
│ │ │ │ ├── __init__.py
│ │ │ │ └── posthog.py
│ │ │ ├── poetry.toml
│ │ │ ├── pyproject.toml
│ │ │ ├── README.md
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_telemetry.py
│ │ ├── mcp-server
│ │ │ ├── .bumpversion.cfg
│ │ │ ├── build-extension.py
│ │ │ ├── CONCURRENT_SESSIONS.md
│ │ │ ├── desktop-extension
│ │ │ │ ├── cua-extension.mcpb
│ │ │ │ ├── desktop_extension.png
│ │ │ │ ├── manifest.json
│ │ │ │ ├── README.md
│ │ │ │ ├── requirements.txt
│ │ │ │ ├── run_server.sh
│ │ │ │ └── setup.py
│ │ │ ├── mcp_server
│ │ │ │ ├── __init__.py
│ │ │ │ ├── __main__.py
│ │ │ │ ├── server.py
│ │ │ │ └── session_manager.py
│ │ │ ├── pdm.lock
│ │ │ ├── pyproject.toml
│ │ │ ├── QUICK_TEST_COMMANDS.sh
│ │ │ ├── quick_test_local_option.py
│ │ │ ├── README.md
│ │ │ ├── scripts
│ │ │ │ ├── install_mcp_server.sh
│ │ │ │ └── start_mcp_server.sh
│ │ │ ├── test_mcp_server_local_option.py
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_mcp_server.py
│ │ ├── pylume
│ │ │ └── tests
│ │ │ ├── conftest.py
│ │ │ └── test_pylume.py
│ │ └── som
│ │ ├── .bumpversion.cfg
│ │ ├── LICENSE
│ │ ├── poetry.toml
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ ├── som
│ │ │ ├── __init__.py
│ │ │ ├── detect.py
│ │ │ ├── detection.py
│ │ │ ├── models.py
│ │ │ ├── ocr.py
│ │ │ ├── util
│ │ │ │ └── utils.py
│ │ │ └── visualization.py
│ │ └── tests
│ │ ├── conftest.py
│ │ └── test_omniparser.py
│ ├── qemu-docker
│ │ ├── linux
│ │ │ ├── Dockerfile
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ ├── entry.sh
│ │ │ └── vm
│ │ │ ├── image
│ │ │ │ └── README.md
│ │ │ └── setup
│ │ │ ├── install.sh
│ │ │ ├── setup-cua-server.sh
│ │ │ └── setup.sh
│ │ ├── README.md
│ │ └── windows
│ │ ├── Dockerfile
│ │ ├── README.md
│ │ └── src
│ │ ├── entry.sh
│ │ └── vm
│ │ ├── image
│ │ │ └── README.md
│ │ └── setup
│ │ ├── install.bat
│ │ ├── on-logon.ps1
│ │ ├── setup-cua-server.ps1
│ │ ├── setup-utils.psm1
│ │ └── setup.ps1
│ ├── typescript
│ │ ├── .gitignore
│ │ ├── .nvmrc
│ │ ├── agent
│ │ │ ├── examples
│ │ │ │ ├── playground-example.html
│ │ │ │ └── README.md
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── client.ts
│ │ │ │ ├── index.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ └── client.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── computer
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── computer
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── providers
│ │ │ │ │ │ ├── base.ts
│ │ │ │ │ │ ├── cloud.ts
│ │ │ │ │ │ └── index.ts
│ │ │ │ │ └── types.ts
│ │ │ │ ├── index.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── base.ts
│ │ │ │ │ ├── factory.ts
│ │ │ │ │ ├── index.ts
│ │ │ │ │ ├── linux.ts
│ │ │ │ │ ├── macos.ts
│ │ │ │ │ └── windows.ts
│ │ │ │ └── types.ts
│ │ │ ├── tests
│ │ │ │ ├── computer
│ │ │ │ │ └── cloud.test.ts
│ │ │ │ ├── interface
│ │ │ │ │ ├── factory.test.ts
│ │ │ │ │ ├── index.test.ts
│ │ │ │ │ ├── linux.test.ts
│ │ │ │ │ ├── macos.test.ts
│ │ │ │ │ └── windows.test.ts
│ │ │ │ └── setup.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── core
│ │ │ ├── .editorconfig
│ │ │ ├── .gitattributes
│ │ │ ├── .gitignore
│ │ │ ├── LICENSE
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── index.ts
│ │ │ │ └── telemetry
│ │ │ │ ├── clients
│ │ │ │ │ ├── index.ts
│ │ │ │ │ └── posthog.ts
│ │ │ │ └── index.ts
│ │ │ ├── tests
│ │ │ │ └── telemetry.test.ts
│ │ │ ├── tsconfig.json
│ │ │ ├── tsdown.config.ts
│ │ │ └── vitest.config.ts
│ │ ├── cua-cli
│ │ │ ├── .gitignore
│ │ │ ├── .prettierrc
│ │ │ ├── bun.lock
│ │ │ ├── CLAUDE.md
│ │ │ ├── index.ts
│ │ │ ├── package.json
│ │ │ ├── README.md
│ │ │ ├── src
│ │ │ │ ├── auth.ts
│ │ │ │ ├── cli.ts
│ │ │ │ ├── commands
│ │ │ │ │ ├── auth.ts
│ │ │ │ │ └── sandbox.ts
│ │ │ │ ├── config.ts
│ │ │ │ ├── http.ts
│ │ │ │ ├── storage.ts
│ │ │ │ └── util.ts
│ │ │ └── tsconfig.json
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── pnpm-workspace.yaml
│ │ └── README.md
│ └── xfce
│ ├── .dockerignore
│ ├── .gitignore
│ ├── Development.md
│ ├── Dockerfile
│ ├── Dockerfile.dev
│ ├── README.md
│ └── src
│ ├── scripts
│ │ ├── resize-display.sh
│ │ ├── start-computer-server.sh
│ │ ├── start-novnc.sh
│ │ ├── start-vnc.sh
│ │ └── xstartup.sh
│ ├── supervisor
│ │ └── supervisord.conf
│ └── xfce-config
│ ├── helpers.rc
│ ├── xfce4-power-manager.xml
│ └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│ ├── agent_nb.ipynb
│ ├── blog
│ │ ├── build-your-own-operator-on-macos-1.ipynb
│ │ └── build-your-own-operator-on-macos-2.ipynb
│ ├── composite_agents_docker_nb.ipynb
│ ├── computer_nb.ipynb
│ ├── computer_server_nb.ipynb
│ ├── customizing_computeragent.ipynb
│ ├── eval_osworld.ipynb
│ ├── ollama_nb.ipynb
│ ├── README.md
│ ├── sota_hackathon_cloud.ipynb
│ └── sota_hackathon.ipynb
├── package-lock.json
├── package.json
├── pnpm-lock.yaml
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── install-cli.ps1
│ ├── install-cli.sh
│ ├── playground-docker.sh
│ ├── playground.sh
│ ├── run-docker-dev.sh
│ └── typescript-typecheck.js
├── TESTING.md
├── tests
│ ├── agent_loop_testing
│ │ ├── agent_test.py
│ │ └── README.md
│ ├── pytest.ini
│ ├── shell_cmd.py
│ ├── test_files.py
│ ├── test_mcp_server_session_management.py
│ ├── test_mcp_server_streaming.py
│ ├── test_shell_bash.py
│ ├── test_telemetry.py
│ ├── test_tracing.py
│ ├── test_venv.py
│ └── test_watchdog.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/libs/python/computer/computer/interface/generic.py:
--------------------------------------------------------------------------------
```python
import asyncio
import json
import time
from typing import Any, Dict, List, Optional, Tuple
import aiohttp
import websockets
from PIL import Image
from ..logger import Logger, LogLevel
from ..utils import (
bytes_to_image,
decode_base64_image,
draw_box,
encode_base64_image,
resize_image,
)
from .base import BaseComputerInterface
from .models import CommandResult, Key, KeyType, MouseButton
class GenericComputerInterface(BaseComputerInterface):
"""Generic interface with common functionality for all supported platforms (Windows, Linux, macOS)."""
def __init__(
self,
ip_address: str,
username: str = "lume",
password: str = "lume",
api_key: Optional[str] = None,
vm_name: Optional[str] = None,
logger_name: str = "computer.interface.generic",
api_port: Optional[int] = None,
):
super().__init__(ip_address, username, password, api_key, vm_name)
self._ws = None
self._reconnect_task = None
self._closed = False
self._last_ping = 0
self._ping_interval = 5 # Send ping every 5 seconds
self._ping_timeout = 120 # Wait 120 seconds for pong response
self._reconnect_delay = 1 # Start with 1 second delay
self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts
self._log_connection_attempts = True # Flag to control connection attempt logging
self._authenticated = False # Track authentication status
self._recv_lock = asyncio.Lock() # Lock to ensure only one recv at a time
# Set logger name for the interface
self.logger = Logger(logger_name, LogLevel.NORMAL)
# Store custom ports
self._api_port = api_port
# Optional default delay time between commands (in seconds)
self.delay = 0.0
async def _handle_delay(self, delay: Optional[float] = None):
"""Handle delay between commands using async sleep.
Args:
delay: Optional delay in seconds. If None, uses self.delay.
"""
if delay is not None:
if isinstance(delay, float) or isinstance(delay, int) and delay > 0:
await asyncio.sleep(delay)
elif isinstance(self.delay, float) or isinstance(self.delay, int) and self.delay > 0:
await asyncio.sleep(self.delay)
@property
def ws_uri(self) -> str:
"""Get the WebSocket URI using the current IP address.
Returns:
WebSocket URI for the Computer API Server
"""
protocol = "wss" if self.api_key else "ws"
# Use custom API port if provided, otherwise use defaults based on API key
port = (
str(self._api_port)
if self._api_port is not None
else ("8443" if self.api_key else "8000")
)
return f"{protocol}://{self.ip_address}:{port}/ws"
@property
def rest_uri(self) -> str:
"""Get the REST URI using the current IP address.
Returns:
REST URI for the Computer API Server
"""
protocol = "https" if self.api_key else "http"
# Use custom API port if provided, otherwise use defaults based on API key
port = (
str(self._api_port)
if self._api_port is not None
else ("8443" if self.api_key else "8000")
)
return f"{protocol}://{self.ip_address}:{port}/cmd"
# Mouse actions
async def mouse_down(
self,
x: Optional[int] = None,
y: Optional[int] = None,
button: str = "left",
delay: Optional[float] = None,
) -> None:
await self._send_command("mouse_down", {"x": x, "y": y, "button": button})
await self._handle_delay(delay)
async def mouse_up(
self,
x: Optional[int] = None,
y: Optional[int] = None,
button: str = "left",
delay: Optional[float] = None,
) -> None:
await self._send_command("mouse_up", {"x": x, "y": y, "button": button})
await self._handle_delay(delay)
async def left_click(
self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
) -> None:
await self._send_command("left_click", {"x": x, "y": y})
await self._handle_delay(delay)
async def right_click(
self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
) -> None:
await self._send_command("right_click", {"x": x, "y": y})
await self._handle_delay(delay)
async def double_click(
self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
) -> None:
await self._send_command("double_click", {"x": x, "y": y})
await self._handle_delay(delay)
async def move_cursor(self, x: int, y: int, delay: Optional[float] = None) -> None:
await self._send_command("move_cursor", {"x": x, "y": y})
await self._handle_delay(delay)
async def drag_to(
self,
x: int,
y: int,
button: "MouseButton" = "left",
duration: float = 0.5,
delay: Optional[float] = None,
) -> None:
await self._send_command(
"drag_to", {"x": x, "y": y, "button": button, "duration": duration}
)
await self._handle_delay(delay)
async def drag(
self,
path: List[Tuple[int, int]],
button: "MouseButton" = "left",
duration: float = 0.5,
delay: Optional[float] = None,
) -> None:
await self._send_command("drag", {"path": path, "button": button, "duration": duration})
await self._handle_delay(delay)
# Keyboard Actions
async def key_down(self, key: "KeyType", delay: Optional[float] = None) -> None:
await self._send_command("key_down", {"key": key})
await self._handle_delay(delay)
async def key_up(self, key: "KeyType", delay: Optional[float] = None) -> None:
await self._send_command("key_up", {"key": key})
await self._handle_delay(delay)
async def type_text(self, text: str, delay: Optional[float] = None) -> None:
await self._send_command("type_text", {"text": text})
await self._handle_delay(delay)
async def press(self, key: "KeyType", delay: Optional[float] = None) -> None:
"""Press a single key.
Args:
key: The key to press. Can be any of:
- A Key enum value (recommended), e.g. Key.PAGE_DOWN
- A direct key value string, e.g. 'pagedown'
- A single character string, e.g. 'a'
Examples:
```python
# Using enum (recommended)
await interface.press(Key.PAGE_DOWN)
await interface.press(Key.ENTER)
# Using direct values
await interface.press('pagedown')
await interface.press('enter')
# Using single characters
await interface.press('a')
```
Raises:
ValueError: If the key type is invalid or the key is not recognized
"""
if isinstance(key, Key):
actual_key = key.value
elif isinstance(key, str):
# Try to convert to enum if it matches a known key
key_or_enum = Key.from_string(key)
actual_key = key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum
else:
raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")
await self._send_command("press_key", {"key": actual_key})
await self._handle_delay(delay)
async def press_key(self, key: "KeyType", delay: Optional[float] = None) -> None:
"""DEPRECATED: Use press() instead.
This method is kept for backward compatibility but will be removed in a future version.
Please use the press() method instead.
"""
await self.press(key, delay)
async def hotkey(self, *keys: "KeyType", delay: Optional[float] = None) -> None:
"""Press multiple keys simultaneously.
Args:
*keys: Multiple keys to press simultaneously. Each key can be any of:
- A Key enum value (recommended), e.g. Key.COMMAND
- A direct key value string, e.g. 'command'
- A single character string, e.g. 'a'
Examples:
```python
# Using enums (recommended)
await interface.hotkey(Key.COMMAND, Key.C) # Copy
await interface.hotkey(Key.COMMAND, Key.V) # Paste
# Using mixed formats
await interface.hotkey(Key.COMMAND, 'a') # Select all
```
Raises:
ValueError: If any key type is invalid or not recognized
"""
actual_keys = []
for key in keys:
if isinstance(key, Key):
actual_keys.append(key.value)
elif isinstance(key, str):
# Try to convert to enum if it matches a known key
key_or_enum = Key.from_string(key)
actual_keys.append(
key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum
)
else:
raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")
await self._send_command("hotkey", {"keys": actual_keys})
await self._handle_delay(delay)
# Scrolling Actions
async def scroll(self, x: int, y: int, delay: Optional[float] = None) -> None:
await self._send_command("scroll", {"x": x, "y": y})
await self._handle_delay(delay)
async def scroll_down(self, clicks: int = 1, delay: Optional[float] = None) -> None:
await self._send_command("scroll_down", {"clicks": clicks})
await self._handle_delay(delay)
async def scroll_up(self, clicks: int = 1, delay: Optional[float] = None) -> None:
await self._send_command("scroll_up", {"clicks": clicks})
await self._handle_delay(delay)
# Screen actions
async def screenshot(
self,
boxes: Optional[List[Tuple[int, int, int, int]]] = None,
box_color: str = "#FF0000",
box_thickness: int = 2,
scale_factor: float = 1.0,
) -> bytes:
"""Take a screenshot with optional box drawing and scaling.
Args:
boxes: Optional list of (x, y, width, height) tuples defining boxes to draw in screen coordinates
box_color: Color of the boxes in hex format (default: "#FF0000" red)
box_thickness: Thickness of the box borders in pixels (default: 2)
scale_factor: Factor to scale the final image by (default: 1.0)
Use > 1.0 to enlarge, < 1.0 to shrink (e.g., 0.5 for half size, 2.0 for double)
Returns:
bytes: The screenshot image data, optionally with boxes drawn on it and scaled
"""
result = await self._send_command("screenshot")
if not result.get("image_data"):
raise RuntimeError("Failed to take screenshot, no image data received from server")
screenshot = decode_base64_image(result["image_data"])
if boxes:
# Get the natural scaling between screen and screenshot
screen_size = await self.get_screen_size()
screenshot_width, screenshot_height = bytes_to_image(screenshot).size
width_scale = screenshot_width / screen_size["width"]
height_scale = screenshot_height / screen_size["height"]
# Scale box coordinates from screen space to screenshot space
for box in boxes:
scaled_box = (
int(box[0] * width_scale), # x
int(box[1] * height_scale), # y
int(box[2] * width_scale), # width
int(box[3] * height_scale), # height
)
screenshot = draw_box(
screenshot,
x=scaled_box[0],
y=scaled_box[1],
width=scaled_box[2],
height=scaled_box[3],
color=box_color,
thickness=box_thickness,
)
if scale_factor != 1.0:
screenshot = resize_image(screenshot, scale_factor)
return screenshot
async def get_screen_size(self) -> Dict[str, int]:
result = await self._send_command("get_screen_size")
if result["success"] and result["size"]:
return result["size"]
raise RuntimeError("Failed to get screen size")
async def get_cursor_position(self) -> Dict[str, int]:
result = await self._send_command("get_cursor_position")
if result["success"] and result["position"]:
return result["position"]
raise RuntimeError("Failed to get cursor position")
# Clipboard Actions
async def copy_to_clipboard(self) -> str:
result = await self._send_command("copy_to_clipboard")
if result["success"] and result["content"]:
return result["content"]
raise RuntimeError("Failed to get clipboard content")
async def set_clipboard(self, text: str) -> None:
await self._send_command("set_clipboard", {"text": text})
# File Operations
async def _write_bytes_chunked(
self, path: str, content: bytes, append: bool = False, chunk_size: int = 1024 * 1024
) -> None:
"""Write large files in chunks to avoid memory issues."""
total_size = len(content)
current_offset = 0
while current_offset < total_size:
chunk_end = min(current_offset + chunk_size, total_size)
chunk_data = content[current_offset:chunk_end]
# First chunk uses the original append flag, subsequent chunks always append
chunk_append = append if current_offset == 0 else True
result = await self._send_command(
"write_bytes",
{
"path": path,
"content_b64": encode_base64_image(chunk_data),
"append": chunk_append,
},
)
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to write file chunk"))
current_offset = chunk_end
async def write_bytes(self, path: str, content: bytes, append: bool = False) -> None:
# For large files, use chunked writing
if len(content) > 5 * 1024 * 1024: # 5MB threshold
await self._write_bytes_chunked(path, content, append)
return
result = await self._send_command(
"write_bytes",
{"path": path, "content_b64": encode_base64_image(content), "append": append},
)
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to write file"))
async def _read_bytes_chunked(
self, path: str, offset: int, total_length: int, chunk_size: int = 1024 * 1024
) -> bytes:
"""Read large files in chunks to avoid memory issues."""
chunks = []
current_offset = offset
remaining = total_length
while remaining > 0:
read_size = min(chunk_size, remaining)
result = await self._send_command(
"read_bytes", {"path": path, "offset": current_offset, "length": read_size}
)
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to read file chunk"))
content_b64 = result.get("content_b64", "")
chunk_data = decode_base64_image(content_b64)
chunks.append(chunk_data)
current_offset += read_size
remaining -= read_size
return b"".join(chunks)
async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> bytes:
# For large files, use chunked reading
if length is None:
# Get file size first to determine if we need chunking
file_size = await self.get_file_size(path)
# If file is larger than 5MB, read in chunks
if file_size > 5 * 1024 * 1024: # 5MB threshold
return await self._read_bytes_chunked(
path, offset, file_size - offset if offset > 0 else file_size
)
result = await self._send_command(
"read_bytes", {"path": path, "offset": offset, "length": length}
)
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to read file"))
content_b64 = result.get("content_b64", "")
return decode_base64_image(content_b64)
async def read_text(self, path: str, encoding: str = "utf-8") -> str:
"""Read text from a file with specified encoding.
Args:
path: Path to the file to read
encoding: Text encoding to use (default: 'utf-8')
Returns:
str: The decoded text content of the file
"""
content_bytes = await self.read_bytes(path)
return content_bytes.decode(encoding)
async def write_text(
self, path: str, content: str, encoding: str = "utf-8", append: bool = False
) -> None:
"""Write text to a file with specified encoding.
Args:
path: Path to the file to write
content: Text content to write
encoding: Text encoding to use (default: 'utf-8')
append: Whether to append to the file instead of overwriting
"""
content_bytes = content.encode(encoding)
await self.write_bytes(path, content_bytes, append)
async def get_file_size(self, path: str) -> int:
result = await self._send_command("get_file_size", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get file size"))
return result.get("size", 0)
async def file_exists(self, path: str) -> bool:
result = await self._send_command("file_exists", {"path": path})
return result.get("exists", False)
async def directory_exists(self, path: str) -> bool:
result = await self._send_command("directory_exists", {"path": path})
return result.get("exists", False)
async def create_dir(self, path: str) -> None:
result = await self._send_command("create_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to create directory"))
async def delete_file(self, path: str) -> None:
result = await self._send_command("delete_file", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to delete file"))
async def delete_dir(self, path: str) -> None:
result = await self._send_command("delete_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to delete directory"))
async def list_dir(self, path: str) -> list[str]:
result = await self._send_command("list_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to list directory"))
return result.get("files", [])
# Desktop actions
async def get_desktop_environment(self) -> str:
result = await self._send_command("get_desktop_environment")
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get desktop environment"))
return result.get("environment", "unknown")
async def set_wallpaper(self, path: str) -> None:
result = await self._send_command("set_wallpaper", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to set wallpaper"))
# Window management
async def open(self, target: str) -> None:
result = await self._send_command("open", {"target": target})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to open target"))
async def launch(self, app: str, args: list[str] | None = None) -> int | None:
payload: dict[str, object] = {"app": app}
if args is not None:
payload["args"] = args
result = await self._send_command("launch", payload)
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to launch application"))
return result.get("pid") # type: ignore[return-value]
async def get_current_window_id(self) -> int | str:
result = await self._send_command("get_current_window_id")
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get current window id"))
return result["window_id"] # type: ignore[return-value]
async def get_application_windows(self, app: str) -> list[int | str]:
result = await self._send_command("get_application_windows", {"app": app})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get application windows"))
return list(result.get("windows", [])) # type: ignore[return-value]
async def get_window_name(self, window_id: int | str) -> str:
result = await self._send_command("get_window_name", {"window_id": window_id})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get window name"))
return result.get("name", "") # type: ignore[return-value]
async def get_window_size(self, window_id: int | str) -> tuple[int, int]:
result = await self._send_command("get_window_size", {"window_id": window_id})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get window size"))
return int(result.get("width", 0)), int(result.get("height", 0))
async def get_window_position(self, window_id: int | str) -> tuple[int, int]:
result = await self._send_command("get_window_position", {"window_id": window_id})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get window position"))
return int(result.get("x", 0)), int(result.get("y", 0))
async def set_window_size(self, window_id: int | str, width: int, height: int) -> None:
result = await self._send_command(
"set_window_size", {"window_id": window_id, "width": width, "height": height}
)
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to set window size"))
async def set_window_position(self, window_id: int | str, x: int, y: int) -> None:
result = await self._send_command(
"set_window_position", {"window_id": window_id, "x": x, "y": y}
)
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to set window position"))
async def maximize_window(self, window_id: int | str) -> None:
result = await self._send_command("maximize_window", {"window_id": window_id})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to maximize window"))
async def minimize_window(self, window_id: int | str) -> None:
result = await self._send_command("minimize_window", {"window_id": window_id})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to minimize window"))
async def activate_window(self, window_id: int | str) -> None:
result = await self._send_command("activate_window", {"window_id": window_id})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to activate window"))
async def close_window(self, window_id: int | str) -> None:
result = await self._send_command("close_window", {"window_id": window_id})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to close window"))
# Convenience aliases
async def get_window_title(self, window_id: int | str) -> str:
return await self.get_window_name(window_id)
async def window_size(self, window_id: int | str) -> tuple[int, int]:
return await self.get_window_size(window_id)
# Command execution
async def run_command(self, command: str) -> CommandResult:
result = await self._send_command("run_command", {"command": command})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to run command"))
return CommandResult(
stdout=result.get("stdout", ""),
stderr=result.get("stderr", ""),
returncode=result.get("return_code", 0),
)
# Accessibility Actions
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the accessibility tree of the current screen."""
result = await self._send_command("get_accessibility_tree")
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get accessibility tree"))
return result
async def get_active_window_bounds(self) -> Dict[str, int]:
"""Get the bounds of the currently active window."""
result = await self._send_command("get_active_window_bounds")
if result["success"] and result["bounds"]:
return result["bounds"]
raise RuntimeError("Failed to get active window bounds")
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screenshot coordinates to screen coordinates.
Args:
x: X coordinate in screenshot space
y: Y coordinate in screenshot space
Returns:
tuple[float, float]: (x, y) coordinates in screen space
"""
screen_size = await self.get_screen_size()
screenshot = await self.screenshot()
screenshot_img = bytes_to_image(screenshot)
screenshot_width, screenshot_height = screenshot_img.size
# Calculate scaling factors
width_scale = screen_size["width"] / screenshot_width
height_scale = screen_size["height"] / screenshot_height
# Convert coordinates
screen_x = x * width_scale
screen_y = y * height_scale
return screen_x, screen_y
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screen coordinates to screenshot coordinates.
Args:
x: X coordinate in screen space
y: Y coordinate in screen space
Returns:
tuple[float, float]: (x, y) coordinates in screenshot space
"""
screen_size = await self.get_screen_size()
screenshot = await self.screenshot()
screenshot_img = bytes_to_image(screenshot)
screenshot_width, screenshot_height = screenshot_img.size
# Calculate scaling factors
width_scale = screenshot_width / screen_size["width"]
height_scale = screenshot_height / screen_size["height"]
# Convert coordinates
screenshot_x = x * width_scale
screenshot_y = y * height_scale
return screenshot_x, screenshot_y
# Playwright browser control
async def playwright_exec(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""
Execute a Playwright browser command.
Args:
command: The browser command to execute (visit_url, click, type, scroll, web_search)
params: Command parameters
Returns:
Dict containing the command result
Examples:
# Navigate to a URL
await interface.playwright_exec("visit_url", {"url": "https://example.com"})
# Click at coordinates
await interface.playwright_exec("click", {"x": 100, "y": 200})
# Type text
await interface.playwright_exec("type", {"text": "Hello, world!"})
# Scroll
await interface.playwright_exec("scroll", {"delta_x": 0, "delta_y": -100})
# Web search
await interface.playwright_exec("web_search", {"query": "computer use agent"})
"""
protocol = "https" if self.api_key else "http"
port = "8443" if self.api_key else "8000"
url = f"{protocol}://{self.ip_address}:{port}/playwright_exec"
payload = {"command": command, "params": params or {}}
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-API-Key"] = self.api_key
if self.vm_name:
headers["X-Container-Name"] = self.vm_name
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, headers=headers) as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
return {"success": False, "error": error_text}
except Exception as e:
return {"success": False, "error": str(e)}
# Websocket Methods
async def _keep_alive(self):
"""Keep the WebSocket connection alive with automatic reconnection."""
retry_count = 0
max_log_attempts = 1 # Only log the first attempt at INFO level
log_interval = 500 # Then log every 500th attempt (significantly increased from 30)
last_warning_time = 0
min_warning_interval = 30 # Minimum seconds between connection lost warnings
min_retry_delay = 0.5 # Minimum delay between connection attempts (500ms)
while not self._closed:
try:
if self._ws is None or (
self._ws and self._ws.state == websockets.protocol.State.CLOSED
):
try:
retry_count += 1
# Add a minimum delay between connection attempts to avoid flooding
if retry_count > 1:
await asyncio.sleep(min_retry_delay)
# Only log the first attempt at INFO level, then every Nth attempt
if retry_count == 1:
self.logger.info(f"Attempting WebSocket connection to {self.ws_uri}")
elif retry_count % log_interval == 0:
self.logger.info(
f"Still attempting WebSocket connection (attempt {retry_count})..."
)
else:
# All other attempts are logged at DEBUG level
self.logger.debug(
f"Attempting WebSocket connection to {self.ws_uri} (attempt {retry_count})"
)
self._ws = await asyncio.wait_for(
websockets.connect(
self.ws_uri,
max_size=1024 * 1024 * 10, # 10MB limit
max_queue=32,
ping_interval=self._ping_interval,
ping_timeout=self._ping_timeout,
close_timeout=5,
compression=None, # Disable compression to reduce overhead
),
timeout=120,
)
self.logger.info("WebSocket connection established")
# If api_key and vm_name are provided, perform authentication handshake
if self.api_key and self.vm_name:
self.logger.info("Performing authentication handshake...")
auth_message = {
"command": "authenticate",
"params": {"api_key": self.api_key, "container_name": self.vm_name},
}
await self._ws.send(json.dumps(auth_message))
# Wait for authentication response
async with self._recv_lock:
auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10)
auth_result = json.loads(auth_response)
if not auth_result.get("success"):
error_msg = auth_result.get("error", "Authentication failed")
self.logger.error(f"Authentication failed: {error_msg}")
await self._ws.close()
self._ws = None
raise ConnectionError(f"Authentication failed: {error_msg}")
self.logger.info("Authentication successful")
self._reconnect_delay = 1 # Reset reconnect delay on successful connection
self._last_ping = time.time()
retry_count = 0 # Reset retry count on successful connection
except (asyncio.TimeoutError, websockets.exceptions.WebSocketException) as e:
next_retry = self._reconnect_delay
# Only log the first error at WARNING level, then every Nth attempt
if retry_count == 1:
self.logger.warning(
"Computer API Server not ready yet. Will retry automatically."
)
elif retry_count % log_interval == 0:
self.logger.warning(
f"Still waiting for Computer API Server (attempt {retry_count})..."
)
else:
# All other errors are logged at DEBUG level
self.logger.debug(f"Connection attempt {retry_count} failed: {e}")
if self._ws:
try:
await self._ws.close()
except:
pass
self._ws = None
# Use exponential backoff for connection retries
await asyncio.sleep(self._reconnect_delay)
self._reconnect_delay = min(
self._reconnect_delay * 2, self._max_reconnect_delay
)
continue
# Regular ping to check connection
if self._ws and self._ws.state == websockets.protocol.State.OPEN:
try:
if time.time() - self._last_ping >= self._ping_interval:
pong_waiter = await self._ws.ping()
await asyncio.wait_for(pong_waiter, timeout=self._ping_timeout)
self._last_ping = time.time()
except Exception as e:
self.logger.debug(f"Ping failed: {e}")
if self._ws:
try:
await self._ws.close()
except:
pass
self._ws = None
continue
await asyncio.sleep(1)
except Exception as e:
current_time = time.time()
# Only log connection lost warnings at most once every min_warning_interval seconds
if current_time - last_warning_time >= min_warning_interval:
self.logger.warning(
"Computer API Server connection lost. Will retry automatically."
)
last_warning_time = current_time
else:
# Log at debug level instead
self.logger.debug(f"Connection lost: {e}")
if self._ws:
try:
await self._ws.close()
except:
pass
self._ws = None
async def _ensure_connection(self):
"""Ensure WebSocket connection is established."""
if self._reconnect_task is None or self._reconnect_task.done():
self._reconnect_task = asyncio.create_task(self._keep_alive())
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
if self._ws and self._ws.state == websockets.protocol.State.OPEN:
return
retry_count += 1
await asyncio.sleep(1)
except Exception as e:
# Only log at ERROR level for the last retry attempt
if retry_count == max_retries - 1:
self.logger.error(
f"Persistent connection check error after {retry_count} attempts: {e}"
)
else:
self.logger.debug(f"Connection check error (attempt {retry_count}): {e}")
retry_count += 1
await asyncio.sleep(1)
continue
raise ConnectionError("Failed to establish WebSocket connection after multiple retries")
async def _send_command_ws(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Send command through WebSocket."""
max_retries = 3
retry_count = 0
last_error = None
# Acquire lock to ensure only one command is processed at a time
self.logger.debug(f"Acquired lock for command: {command}")
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
async with self._recv_lock:
response = await asyncio.wait_for(self._ws.recv(), timeout=120)
self.logger.debug(f"Completed command: {command}")
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise
raise last_error if last_error else RuntimeError("Failed to send command")
async def _send_command_rest(
self, command: str, params: Optional[Dict] = None
) -> Dict[str, Any]:
"""Send command through REST API without retries or connection management."""
try:
# Prepare the request payload
payload = {"command": command, "params": params or {}}
# Prepare headers
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-API-Key"] = self.api_key
if self.vm_name:
headers["X-Container-Name"] = self.vm_name
# Send the request
async with aiohttp.ClientSession() as session:
async with session.post(self.rest_uri, json=payload, headers=headers) as response:
# Get the response text
response_text = await response.text()
# Trim whitespace
response_text = response_text.strip()
# Check if it starts with "data: "
if response_text.startswith("data: "):
# Extract everything after "data: "
json_str = response_text[6:] # Remove "data: " prefix
try:
return json.loads(json_str)
except json.JSONDecodeError:
return {
"success": False,
"error": "Server returned malformed response",
"message": response_text,
}
else:
# Return error response
return {
"success": False,
"error": "Server returned malformed response",
"message": response_text,
}
except Exception as e:
return {"success": False, "error": "Request failed", "message": str(e)}
async def _send_command(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Send command using REST API with WebSocket fallback."""
# Try REST API first
result = await self._send_command_rest(command, params)
# If REST failed with "Request failed", try WebSocket as fallback
if not result.get("success", True) and (
result.get("error") == "Request failed"
or result.get("error") == "Server returned malformed response"
):
self.logger.warning(
f"REST API failed for command '{command}', trying WebSocket fallback"
)
try:
return await self._send_command_ws(command, params)
except Exception as e:
self.logger.error(f"WebSocket fallback also failed: {e}")
# Return the original REST error
return result
return result
async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0):
"""Wait for Computer API Server to be ready by testing version command."""
# Check if REST API is available
try:
result = await self._send_command_rest("version", {})
assert result.get("success", True)
except Exception as e:
self.logger.debug(
f"REST API failed for command 'version', trying WebSocket fallback: {e}"
)
try:
await self._wait_for_ready_ws(timeout, interval)
return
except Exception as e:
self.logger.debug(f"WebSocket fallback also failed: {e}")
raise e
start_time = time.time()
last_error = None
attempt_count = 0
progress_interval = 10 # Log progress every 10 seconds
last_progress_time = start_time
try:
self.logger.info(
f"Waiting for Computer API Server to be ready (timeout: {timeout}s)..."
)
# Wait for the server to respond to get_screen_size command
while time.time() - start_time < timeout:
try:
attempt_count += 1
current_time = time.time()
# Log progress periodically without flooding logs
if current_time - last_progress_time >= progress_interval:
elapsed = current_time - start_time
self.logger.info(
f"Still waiting for Computer API Server... (elapsed: {elapsed:.1f}s, attempts: {attempt_count})"
)
last_progress_time = current_time
# Test the server with a simple get_screen_size command
result = await self._send_command("get_screen_size")
if result.get("success", False):
elapsed = time.time() - start_time
self.logger.info(
f"Computer API Server is ready (after {elapsed:.1f}s, {attempt_count} attempts)"
)
return # Server is ready
else:
last_error = result.get("error", "Unknown error")
self.logger.debug(f"Initial connection command failed: {last_error}")
except Exception as e:
last_error = e
self.logger.debug(f"Connection attempt {attempt_count} failed: {e}")
# Wait before trying again
await asyncio.sleep(interval)
# If we get here, we've timed out
error_msg = f"Could not connect to {self.ip_address} after {timeout} seconds"
if last_error:
error_msg += f": {str(last_error)}"
self.logger.error(error_msg)
raise TimeoutError(error_msg)
except Exception as e:
if isinstance(e, TimeoutError):
raise
error_msg = f"Error while waiting for server: {str(e)}"
self.logger.error(error_msg)
raise RuntimeError(error_msg)
async def _wait_for_ready_ws(self, timeout: int = 60, interval: float = 1.0):
"""Wait for WebSocket connection to become available."""
start_time = time.time()
last_error = None
attempt_count = 0
progress_interval = 10 # Log progress every 10 seconds
last_progress_time = start_time
# Disable detailed logging for connection attempts
self._log_connection_attempts = False
try:
self.logger.info(
f"Waiting for Computer API Server to be ready (timeout: {timeout}s)..."
)
# Start the keep-alive task if it's not already running
if self._reconnect_task is None or self._reconnect_task.done():
self._reconnect_task = asyncio.create_task(self._keep_alive())
# Wait for the connection to be established
while time.time() - start_time < timeout:
try:
attempt_count += 1
current_time = time.time()
# Log progress periodically without flooding logs
if current_time - last_progress_time >= progress_interval:
elapsed = current_time - start_time
self.logger.info(
f"Still waiting for Computer API Server... (elapsed: {elapsed:.1f}s, attempts: {attempt_count})"
)
last_progress_time = current_time
# Check if we have a connection
if self._ws and self._ws.state == websockets.protocol.State.OPEN:
# Test the connection with a simple command
try:
await self._send_command_ws("get_screen_size")
elapsed = time.time() - start_time
self.logger.info(
f"Computer API Server is ready (after {elapsed:.1f}s, {attempt_count} attempts)"
)
return # Connection is fully working
except Exception as e:
last_error = e
self.logger.debug(f"Connection test failed: {e}")
# Wait before trying again
await asyncio.sleep(interval)
except Exception as e:
last_error = e
self.logger.debug(f"Connection attempt {attempt_count} failed: {e}")
await asyncio.sleep(interval)
# If we get here, we've timed out
error_msg = f"Could not connect to {self.ip_address} after {timeout} seconds"
if last_error:
error_msg += f": {str(last_error)}"
self.logger.error(error_msg)
raise TimeoutError(error_msg)
finally:
# Reset to default logging behavior
self._log_connection_attempts = False
def close(self):
"""Close WebSocket connection.
Note: In host computer server mode, we leave the connection open
to allow other clients to connect to the same server. The server
will handle cleaning up idle connections.
"""
# Only cancel the reconnect task
if self._reconnect_task:
self._reconnect_task.cancel()
# Don't set closed flag or close websocket by default
# This allows the server to stay connected for other clients
# self._closed = True
# if self._ws:
# asyncio.create_task(self._ws.close())
# self._ws = None
def force_close(self):
"""Force close the WebSocket connection.
This method should be called when you want to completely
shut down the connection, not just for regular cleanup.
"""
self._closed = True
if self._reconnect_task:
self._reconnect_task.cancel()
if self._ws:
asyncio.create_task(self._ws.close())
self._ws = None
```
--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/diorama/draw.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""Diorama Renderer - A tool for rendering selective views of macOS desktops
This script renders filtered views of the macOS desktop, preserving only selected applications
while maintaining system UI elements like menubar and dock. Each "diorama" shows a consistent
view of the system while isolating specific applications.
The image is "smart resized" to remove any empty space around the menubar and dock.
Key features:
- Captures shared window state, z-order and position information
- Filters windows by application based on whitelist
- Preserves system context (menubar, dock) in each view
- Preserves menu-owning / keyboard-focused window in each view
- Supports parallel views of the same desktop for multi-agent systems
"""
import argparse
import asyncio
import functools
import io
import json
import logging
import os
import sys
import time
from typing import Any, Dict, List, Optional, Tuple
from PIL import Image, ImageDraw
# simple, nicely formatted logging
logger = logging.getLogger(__name__)
from computer_server.diorama.safezone import (
get_dock_bounds,
get_menubar_bounds,
)
# Timing decorator for profiling
def timing_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
elapsed_time = end_time - start_time
logger.debug(f"Function {func.__name__} took {elapsed_time:.4f} seconds to run")
return result
return wrapper
# Import Objective-C bridge libraries
try:
import AppKit
import Foundation
import objc
import Quartz
from AppKit import NSApp, NSApplication, NSRunningApplication, NSWorkspace
from ApplicationServices import AXUIElementCopyAttributeValue # type: ignore
from ApplicationServices import AXUIElementCopyAttributeValues # type: ignore
from ApplicationServices import AXUIElementCreateApplication # type: ignore
from ApplicationServices import AXUIElementCreateSystemWide # type: ignore
from ApplicationServices import AXUIElementGetTypeID # type: ignore
from ApplicationServices import AXValueGetType # type: ignore
from ApplicationServices import AXValueGetValue # type: ignore
from ApplicationServices import kAXChildrenAttribute # type: ignore
from ApplicationServices import kAXDescriptionAttribute # type: ignore
from ApplicationServices import kAXEnabledAttribute # type: ignore
from ApplicationServices import kAXErrorSuccess # type: ignore
from ApplicationServices import kAXFocusedApplicationAttribute # type: ignore
from ApplicationServices import kAXFocusedUIElementAttribute # type: ignore
from ApplicationServices import kAXFocusedWindowAttribute # type: ignore
from ApplicationServices import kAXMainWindowAttribute # type: ignore
from ApplicationServices import kAXPositionAttribute # type: ignore
from ApplicationServices import kAXRoleAttribute # type: ignore
from ApplicationServices import kAXRoleDescriptionAttribute # type: ignore
from ApplicationServices import kAXSelectedTextAttribute # type: ignore
from ApplicationServices import kAXSelectedTextRangeAttribute # type: ignore
from ApplicationServices import kAXSizeAttribute # type: ignore
from ApplicationServices import kAXTitleAttribute # type: ignore
from ApplicationServices import kAXValueAttribute # type: ignore
from ApplicationServices import kAXValueCFRangeType # type: ignore
from ApplicationServices import kAXValueCGPointType # type: ignore
from ApplicationServices import kAXValueCGSizeType # type: ignore
from ApplicationServices import kAXVisibleChildrenAttribute # type: ignore
from ApplicationServices import kAXWindowsAttribute # type: ignore
from Foundation import NSMakeRect, NSObject
except ImportError:
logger.error("Error: This script requires PyObjC to be installed.")
logger.error("Please install it with: pip install pyobjc")
sys.exit(1)
# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXTitleAttribute = "AXTitle"
kAXValueAttribute = "AXValue"
kAXWindowsAttribute = "AXWindows"
kAXFocusedAttribute = "AXFocused"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
kAXMenuBarItemAttribute = "AXMenuBarItem"
# Constants for window properties
kCGWindowLayer = "kCGWindowLayer" # Z-order information (lower values are higher in the stack)
kCGWindowAlpha = "kCGWindowAlpha" # Window opacity
# Constants for application activation options
NSApplicationActivationOptions = {
"regular": 0, # Default activation
"bringing_all_windows_forward": 1 << 0, # NSApplicationActivateAllWindows
"ignoring_other_apps": 1 << 1, # NSApplicationActivateIgnoringOtherApps
}
def CFAttributeToPyObject(attrValue):
def list_helper(list_value):
list_builder = []
for item in list_value:
list_builder.append(CFAttributeToPyObject(item))
return list_builder
def number_helper(number_value):
success, int_value = Foundation.CFNumberGetValue( # type: ignore
number_value, Foundation.kCFNumberIntType, None # type: ignore
)
if success:
return int(int_value)
success, float_value = Foundation.CFNumberGetValue( # type: ignore
number_value, Foundation.kCFNumberDoubleType, None # type: ignore
)
if success:
return float(float_value)
return None
def axuielement_helper(element_value):
return element_value
cf_attr_type = Foundation.CFGetTypeID(attrValue) # type: ignore
cf_type_mapping = {
Foundation.CFStringGetTypeID(): str, # type: ignore
Foundation.CFBooleanGetTypeID(): bool, # type: ignore
Foundation.CFArrayGetTypeID(): list_helper, # type: ignore
Foundation.CFNumberGetTypeID(): number_helper, # type: ignore
AXUIElementGetTypeID(): axuielement_helper, # type: ignore
}
try:
return cf_type_mapping[cf_attr_type](attrValue)
except KeyError:
# did not get a supported CF type. Move on to AX type
pass
ax_attr_type = AXValueGetType(attrValue)
ax_type_map = {
kAXValueCGSizeType: Foundation.NSSizeFromString, # type: ignore
kAXValueCGPointType: Foundation.NSPointFromString, # type: ignore
kAXValueCFRangeType: Foundation.NSRangeFromString, # type: ignore
}
try:
search_result = re.search("{.*}", attrValue.description())
if search_result:
extracted_str = search_result.group()
return tuple(ax_type_map[ax_attr_type](extracted_str))
return None
except KeyError:
return None
def element_attribute(element, attribute):
if attribute == kAXChildrenAttribute:
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
if err == kAXErrorSuccess:
if isinstance(value, Foundation.NSArray): # type: ignore
return CFAttributeToPyObject(value)
else:
return value
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
if err == kAXErrorSuccess:
if isinstance(value, Foundation.NSArray): # type: ignore
return CFAttributeToPyObject(value)
else:
return value
return None
def element_value(element, type):
err, value = AXValueGetValue(element, type, None)
if err == True:
return value
return None
@timing_decorator
def get_running_apps() -> List[NSRunningApplication]:
"""Get list of all running applications
Returns:
List of NSRunningApplication objects
"""
return NSWorkspace.sharedWorkspace().runningApplications()
# @timing_decorator
def get_app_info(app: NSRunningApplication) -> Dict[str, Any]:
"""Get information about an application
Args:
app: NSRunningApplication object
Returns:
Dictionary with application information
"""
return {
"name": app.localizedName(),
"bundle_id": app.bundleIdentifier(),
"pid": app.processIdentifier(),
"active": app.isActive(),
"hidden": app.isHidden(),
"terminated": app.isTerminated(),
}
@timing_decorator
def get_all_windows() -> List[Dict[str, Any]]:
"""Get all windows from all applications with z-order information
Returns:
List of window dictionaries with z-order information
"""
# Get all windows from Quartz
# The kCGWindowListOptionOnScreenOnly flag gets only visible windows with preserved z-order
window_list = Quartz.CGWindowListCopyWindowInfo(
Quartz.kCGWindowListOptionOnScreenOnly, Quartz.kCGNullWindowID
)
# Create a dictionary of window z-order
z_order = {
window["kCGWindowNumber"]: z_index for z_index, window in enumerate(window_list[::-1])
}
# The kCGWindowListOptionAll flag gets all windows *without* z-order preserved
window_list_all = Quartz.CGWindowListCopyWindowInfo(
Quartz.kCGWindowListOptionAll, Quartz.kCGNullWindowID
)
# Process all windows
windows = []
for window in window_list_all:
# We track z_index which is the index in the window list (0 is the desktop / background)
# Get window properties
window_id = window.get("kCGWindowNumber", 0)
window_name = window.get("kCGWindowName", "")
window_pid = window.get("kCGWindowOwnerPID", 0)
window_bounds = window.get("kCGWindowBounds", {})
window_owner = window.get("kCGWindowOwnerName", "")
window_is_on_screen = window.get("kCGWindowIsOnscreen", False)
# Get z-order information
# Note: kCGWindowLayer provides the system's layer value (lower values are higher in the stack)
layer = window.get(kCGWindowLayer, 0)
opacity = window.get(kCGWindowAlpha, 1.0)
z_index = z_order.get(window_id, -1)
# Determine window role (desktop, dock, menubar, app)
if window_name == "Dock" and window_owner == "Dock":
role = "dock"
elif window_name == "Menubar" and window_owner == "Window Server":
role = "menubar"
elif window_owner in ["Window Server", "Dock"]:
role = "desktop"
else:
role = "app"
# Only include windows with valid bounds
if window_bounds:
windows.append(
{
"id": window_id,
"name": window_name or "Unnamed Window",
"pid": window_pid,
"owner": window_owner,
"role": role,
"is_on_screen": window_is_on_screen,
"bounds": {
"x": window_bounds.get("X", 0),
"y": window_bounds.get("Y", 0),
"width": window_bounds.get("Width", 0),
"height": window_bounds.get("Height", 0),
},
"layer": layer, # System layer (lower values are higher in stack)
"z_index": z_index, # Our z-index (0 is the desktop)
"opacity": opacity,
}
)
windows = sorted(windows, key=lambda x: x["z_index"])
return windows
def get_app_windows(app_pid: int, all_windows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Get all windows for a specific application
Args:
app_pid: Process ID of the application
all_windows: List of all windows with z-order information
Returns:
List of window dictionaries for the app
"""
# Filter windows by PID
return [window for window in all_windows if window["pid"] == app_pid]
@timing_decorator
def draw_desktop_screenshot(
app_whitelist: List[str] = None,
all_windows: List[Dict[str, Any]] = None,
dock_bounds: Dict[str, float] = None,
dock_items: List[Dict[str, Any]] = None,
menubar_bounds: Dict[str, float] = None,
menubar_items: List[Dict[str, Any]] = None,
) -> Tuple[Optional[Image.Image], List[Dict[str, Any]]]:
"""Capture a screenshot of the entire desktop using Quartz compositing, including dock as a second pass.
Args:
app_whitelist: Optional list of app names to include in the screenshot
Returns:
PIL Image of the desktop or None if capture failed
"""
import ctypes
if dock_bounds is None:
dock_bounds = get_dock_bounds()
if dock_items is None:
dock_items = get_dock_items()
if menubar_bounds is None:
menubar_bounds = get_menubar_bounds()
if menubar_items is None:
menubar_items = get_menubar_items()
if all_windows is None:
all_windows = get_all_windows()
all_windows = all_windows[::-1]
all_windows = [window for window in all_windows if window["is_on_screen"]]
main_screen = AppKit.NSScreen.mainScreen()
if main_screen:
frame = main_screen.frame()
screen_rect = Quartz.CGRectMake(0, 0, frame.size.width, frame.size.height)
else:
screen_rect = Quartz.CGRectNull
# Screenshot-to-screen hitboxes
hitboxes = []
if app_whitelist is None:
# Single pass: desktop, menubar, app, dock
window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None)
for window in all_windows:
Foundation.CFArrayAppendValue(window_list, window["id"])
cg_image = Quartz.CGWindowListCreateImageFromArray(
screen_rect, window_list, Quartz.kCGWindowImageDefault
)
if cg_image is None:
return None
# Create CGContext for compositing
width = int(frame.size.width)
height = int(frame.size.height)
color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB)
cg_context = Quartz.CGBitmapContextCreate(
None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast
)
Quartz.CGContextDrawImage(cg_context, screen_rect, cg_image)
hitboxes.append({"hitbox": [0, 0, width, height], "target": [0, 0, width, height]})
else:
# Filter out windows that are not in the whitelist
all_windows = [
window
for window in all_windows
if window["owner"] in app_whitelist or window["role"] != "app"
]
app_windows = [window for window in all_windows if window["role"] == "app"]
dock_orientation = "side" if dock_bounds["width"] < dock_bounds["height"] else "bottom"
menubar_length = (
max(item["bounds"]["x"] + item["bounds"]["width"] for item in menubar_items)
if menubar_items
else 0
)
# Calculate bounds of app windows
app_bounds = {
"x": min(window["bounds"]["x"] for window in app_windows) if app_windows else 0,
"y": min(window["bounds"]["y"] for window in app_windows) if app_windows else 0,
}
app_bounds["width"] = (
max(window["bounds"]["x"] + window["bounds"]["width"] for window in app_windows)
- app_bounds["x"]
if app_windows
else 0
)
app_bounds["height"] = (
max(window["bounds"]["y"] + window["bounds"]["height"] for window in app_windows)
- app_bounds["y"]
if app_windows
else 0
)
# Set minimum bounds of 256x256
app_bounds["width"] = max(app_bounds["width"], 256)
app_bounds["height"] = max(app_bounds["height"], 256)
# Add dock bounds to app bounds
if dock_orientation == "bottom":
app_bounds["height"] += dock_bounds["height"] + 4
elif dock_orientation == "side":
if dock_bounds["x"] > frame.size.width / 2:
app_bounds["width"] += dock_bounds["width"] + 4
else:
app_bounds["x"] -= dock_bounds["width"] + 4
app_bounds["width"] += dock_bounds["width"] + 4
# Add menubar bounds to app bounds
app_bounds["height"] += menubar_bounds["height"]
# Make sure app bounds contains menubar bounds
app_bounds["width"] = max(app_bounds["width"], menubar_length)
# Clamp bounds to screen
app_bounds["x"] = max(app_bounds["x"], 0)
app_bounds["y"] = max(app_bounds["y"], 0)
app_bounds["width"] = min(app_bounds["width"], frame.size.width - app_bounds["x"])
app_bounds["height"] = min(
app_bounds["height"], frame.size.height - app_bounds["y"] + menubar_bounds["height"]
)
# Create CGContext for compositing
width = int(app_bounds["width"])
height = int(app_bounds["height"])
color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB)
cg_context = Quartz.CGBitmapContextCreate(
None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast
)
def _draw_layer(cg_context, all_windows, source_rect, target_rect):
"""Draw a layer of windows from source_rect to target_rect on the given context."""
window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None)
for window in all_windows:
Foundation.CFArrayAppendValue(window_list, window["id"])
cg_image = Quartz.CGWindowListCreateImageFromArray(
source_rect, window_list, Quartz.kCGWindowImageDefault
)
if cg_image is not None:
Quartz.CGContextDrawImage(cg_context, target_rect, cg_image)
# --- FIRST PASS: desktop, apps ---
source_position = [app_bounds["x"], app_bounds["y"]]
source_size = [app_bounds["width"], app_bounds["height"]]
target_position = [0, min(menubar_bounds["y"] + menubar_bounds["height"], app_bounds["y"])]
target_size = [app_bounds["width"], app_bounds["height"]]
if dock_orientation == "bottom":
source_size[1] += dock_bounds["height"]
target_size[1] += dock_bounds["height"]
elif dock_orientation == "side":
if dock_bounds["x"] < frame.size.width / 2:
source_position[0] -= dock_bounds["width"]
target_position[0] -= dock_bounds["width"]
source_size[0] += dock_bounds["width"]
target_size[0] += dock_bounds["width"]
app_source_rect = Quartz.CGRectMake(
source_position[0], source_position[1], source_size[0], source_size[1]
)
app_target_rect = Quartz.CGRectMake(
target_position[0],
app_bounds["height"] - target_position[1] - target_size[1],
target_size[0],
target_size[1],
)
first_pass_windows = [
w for w in all_windows if w["role"] == "app" or w["role"] == "desktop"
]
_draw_layer(cg_context, first_pass_windows, app_source_rect, app_target_rect)
hitboxes.append(
{
"hitbox": [
0,
menubar_bounds["height"],
app_bounds["width"],
menubar_bounds["height"] + app_bounds["height"],
],
"target": [
app_source_rect.origin.x,
app_source_rect.origin.y,
app_source_rect.origin.x + app_bounds["width"],
app_source_rect.origin.y + app_bounds["height"],
],
}
)
# --- SECOND PASS: menubar ---
allowed_roles = {"menubar"}
menubar_windows = [w for w in all_windows if w["role"] in allowed_roles]
menubar_source_rect = Quartz.CGRectMake(0, 0, app_bounds["width"], menubar_bounds["height"])
menubar_target_rect = Quartz.CGRectMake(
0,
app_bounds["height"] - menubar_bounds["height"],
app_bounds["width"],
menubar_bounds["height"],
)
_draw_layer(cg_context, menubar_windows, menubar_source_rect, menubar_target_rect)
hitboxes.append(
{
"hitbox": [0, 0, app_bounds["width"], menubar_bounds["height"]],
"target": [0, 0, app_bounds["width"], menubar_bounds["height"]],
}
)
# --- THIRD PASS: dock, filtered ---
# Step 1: Collect dock items to draw, with their computed target rects
dock_draw_items = []
for index, item in enumerate(dock_items):
source_position = (item["bounds"]["x"], item["bounds"]["y"])
source_size = (item["bounds"]["width"], item["bounds"]["height"])
# apply whitelist to middle items
if not (index == 0 or index == len(dock_items) - 1):
if item["subrole"] == "AXApplicationDockItem":
if item["title"] not in app_whitelist:
continue
elif item["subrole"] == "AXMinimizedWindowDockItem":
if not any(
window["name"] == item["title"]
and window["role"] == "app"
and window["owner"] in app_whitelist
for window in all_windows
):
continue
elif item["subrole"] == "AXFolderDockItem":
continue
# Preserve unscaled (original) source position and size before any modification
hitbox_position = source_position
hitbox_size = source_size
screen_position = source_position
screen_size = source_size
# stretch to screen size
padding = 32
if dock_orientation == "bottom":
source_position = (source_position[0], 0)
source_size = (source_size[0], frame.size.height)
hitbox_position = (source_position[0], app_bounds["height"] - hitbox_size[1])
hitbox_size = (source_size[0], hitbox_size[1])
if index == 0:
source_size = (padding + source_size[0], source_size[1])
source_position = (source_position[0] - padding, 0)
elif index == len(dock_items) - 1:
source_size = (source_size[0] + padding, source_size[1])
source_position = (source_position[0], 0)
elif dock_orientation == "side":
source_position = (0, source_position[1])
source_size = (frame.size.width, source_size[1])
hitbox_position = (
(
source_position[0]
if dock_bounds["x"] < frame.size.width / 2
else app_bounds["width"] - hitbox_size[0]
),
source_position[1],
)
hitbox_size = (hitbox_size[0], source_size[1])
if index == 0:
source_size = (source_size[0], padding + source_size[1])
source_position = (0, source_position[1] - padding)
elif index == len(dock_items) - 1:
source_size = (source_size[0], source_size[1] + padding)
source_position = (0, source_position[1])
# Compute the initial target position
target_position = source_position
target_size = source_size
dock_draw_items.append(
{
"item": item,
"index": index,
"source_position": source_position,
"source_size": source_size,
"target_size": target_size,
"target_position": target_position, # Will be updated after packing
"hitbox_position": hitbox_position,
"hitbox_size": hitbox_size,
"screen_position": screen_position,
"screen_size": screen_size,
}
)
# Step 2: Pack the target rects along the main axis, removing gaps
packed_positions = []
if dock_orientation == "bottom":
# Pack left-to-right
x_cursor = 0
for draw_item in dock_draw_items:
packed_positions.append((x_cursor, draw_item["target_position"][1]))
x_cursor += draw_item["target_size"][0]
packed_strip_length = x_cursor
# Center horizontally
x_offset = (app_bounds["width"] - packed_strip_length) / 2
y_offset = frame.size.height - app_bounds["height"]
for i, draw_item in enumerate(dock_draw_items):
px, py = packed_positions[i]
draw_item["target_position"] = (px + x_offset, py - y_offset)
# Pack unscaled source rects
x_cursor = 0
for draw_item in dock_draw_items:
draw_item["hitbox_position"] = (x_cursor, draw_item["hitbox_position"][1])
x_cursor += draw_item["hitbox_size"][0]
packed_strip_length = x_cursor
# Center horizontally
x_offset = (app_bounds["width"] - packed_strip_length) / 2
for i, draw_item in enumerate(dock_draw_items):
px, py = draw_item["hitbox_position"]
draw_item["hitbox_position"] = (px + x_offset, py)
elif dock_orientation == "side":
# Pack top-to-bottom
y_cursor = 0
for draw_item in dock_draw_items:
packed_positions.append((draw_item["target_position"][0], y_cursor))
y_cursor += draw_item["target_size"][1]
packed_strip_length = y_cursor
# Center vertically
y_offset = (app_bounds["height"] - packed_strip_length) / 2
x_offset = (
0
if dock_bounds["x"] < frame.size.width / 2
else frame.size.width - app_bounds["width"]
)
for i, draw_item in enumerate(dock_draw_items):
px, py = packed_positions[i]
draw_item["target_position"] = (px - x_offset, py + y_offset)
# Pack unscaled source rects
y_cursor = 0
for draw_item in dock_draw_items:
draw_item["hitbox_position"] = (draw_item["hitbox_position"][0], y_cursor)
y_cursor += draw_item["hitbox_size"][1]
packed_strip_length = y_cursor
# Center vertically
y_offset = (app_bounds["height"] - packed_strip_length) / 2
for i, draw_item in enumerate(dock_draw_items):
px, py = draw_item["hitbox_position"]
draw_item["hitbox_position"] = (px, py + y_offset)
dock_windows = [window for window in all_windows if window["role"] == "dock"]
# Step 3: Draw dock items using packed and recentered positions
for draw_item in dock_draw_items:
item = draw_item["item"]
source_position = draw_item["source_position"]
source_size = draw_item["source_size"]
target_position = draw_item["target_position"]
target_size = draw_item["target_size"]
# flip target position y
target_position = (
target_position[0],
app_bounds["height"] - target_position[1] - target_size[1],
)
source_rect = Quartz.CGRectMake(*source_position, *source_size)
target_rect = Quartz.CGRectMake(*target_position, *target_size)
_draw_layer(cg_context, dock_windows, source_rect, target_rect)
hitbox_position = draw_item["hitbox_position"]
hitbox_size = draw_item["hitbox_size"]
# Debug: Draw true hitbox rect (packed position, unscaled size)
# # Flip y like target_rect
# hitbox_position_flipped = (
# hitbox_position[0],
# app_bounds['height'] - hitbox_position[1] - hitbox_size[1]
# )
# hitbox_rect = Quartz.CGRectMake(*hitbox_position_flipped, *hitbox_size)
# Quartz.CGContextSetStrokeColorWithColor(cg_context, Quartz.CGColorCreateGenericRGB(0, 1, 0, 1))
# Quartz.CGContextStrokeRect(cg_context, hitbox_rect)
hitboxes.append(
{
"hitbox": [
*hitbox_position,
hitbox_position[0] + hitbox_size[0],
hitbox_position[1] + hitbox_size[1],
],
"target": [
*draw_item["screen_position"],
draw_item["screen_position"][0] + draw_item["screen_size"][0],
draw_item["screen_position"][1] + draw_item["screen_size"][1],
],
}
)
# Convert composited context to CGImage
final_cg_image = Quartz.CGBitmapContextCreateImage(cg_context)
ns_image = AppKit.NSImage.alloc().initWithCGImage_size_(final_cg_image, Foundation.NSZeroSize)
ns_data = ns_image.TIFFRepresentation()
bitmap_rep = AppKit.NSBitmapImageRep.imageRepWithData_(ns_data)
png_data = bitmap_rep.representationUsingType_properties_(AppKit.NSBitmapImageFileTypePNG, None)
image_data = io.BytesIO(png_data)
return Image.open(image_data), hitboxes
@timing_decorator
def get_menubar_items(active_app_pid: int = None) -> List[Dict[str, Any]]:
"""Get menubar items from the active application using Accessibility API
Args:
active_app_pid: PID of the active application
Returns:
List of dictionaries with menubar item information
"""
menubar_items = []
if active_app_pid is None:
# Get the frontmost application's PID if none provided
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
if frontmost_app:
active_app_pid = frontmost_app.processIdentifier()
else:
logger.error("Error: Could not determine frontmost application")
return menubar_items
# Create an accessibility element for the application
app_element = AXUIElementCreateApplication(active_app_pid)
if app_element is None:
logger.error(f"Error: Could not create accessibility element for PID {active_app_pid}")
return menubar_items
# Get the menubar
menubar = element_attribute(app_element, kAXMenuBarAttribute)
if menubar is None:
logger.error(f"Error: Could not get menubar for application with PID {active_app_pid}")
return menubar_items
# Get the menubar items
children = element_attribute(menubar, kAXChildrenAttribute)
if children is None:
logger.error("Error: Could not get menubar items")
return menubar_items
# Process each menubar item
for i in range(len(children)):
item = children[i]
# Get item title
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
# Create bounding box
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
# Get item position
position_value = element_attribute(item, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
bounds["x"] = position_value.x
bounds["y"] = position_value.y
# Get item size
size_value = element_attribute(item, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
bounds["width"] = size_value.width
bounds["height"] = size_value.height
# Add to list
menubar_items.append(
{"title": title, "bounds": bounds, "index": i, "app_pid": active_app_pid}
)
return menubar_items
@timing_decorator
def get_dock_items() -> List[Dict[str, Any]]:
"""Get all items in the macOS Dock
Returns:
List of dictionaries with Dock item information
"""
dock_items = []
# Find the Dock process
dock_pid = None
running_apps = get_running_apps()
for app in running_apps:
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
dock_pid = app.processIdentifier()
break
if dock_pid is None:
logger.error("Error: Could not find Dock process")
return dock_items
# Create an accessibility element for the Dock
dock_element = AXUIElementCreateApplication(dock_pid)
if dock_element is None:
logger.error(f"Error: Could not create accessibility element for Dock (PID {dock_pid})")
return dock_items
# Get the Dock's main element
dock_list = element_attribute(dock_element, kAXChildrenAttribute)
if dock_list is None or len(dock_list) == 0:
logger.error("Error: Could not get Dock children")
return dock_items
# Find the Dock's application list (usually the first child)
dock_app_list = None
for child in dock_list:
role = element_attribute(child, kAXRoleAttribute)
if role == "AXList":
dock_app_list = child
break
if dock_app_list is None:
logger.error("Error: Could not find Dock application list")
return dock_items
# Get all items in the Dock
items = element_attribute(dock_app_list, kAXChildrenAttribute)
if items is None:
logger.error("Error: Could not get Dock items")
return dock_items
# Process each Dock item
for i, item in enumerate(items):
# Get item attributes
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
description = element_attribute(item, "AXDescription") or ""
role = element_attribute(item, kAXRoleAttribute) or ""
subrole = element_attribute(item, "AXSubrole") or ""
# Create bounding box
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
# Get item position
position_value = element_attribute(item, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
bounds["x"] = position_value.x
bounds["y"] = position_value.y
# Get item size
size_value = element_attribute(item, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
bounds["width"] = size_value.width
bounds["height"] = size_value.height
# Determine if this is an application, file/folder, or separator
item_type = "unknown"
if subrole == "AXApplicationDockItem":
item_type = "application"
elif subrole == "AXFolderDockItem":
item_type = "folder"
elif subrole == "AXDocumentDockItem":
item_type = "document"
elif subrole == "AXSeparatorDockItem" or role == "AXSeparator":
item_type = "separator"
elif "trash" in title.lower():
item_type = "trash"
# Add to list
dock_items.append(
{
"title": title,
"description": description,
"bounds": bounds,
"index": i,
"type": item_type,
"role": role,
"subrole": subrole,
}
)
return dock_items
class AppActivationContext:
def __init__(self, active_app_pid=None, active_app_to_use="", logger=None):
self.active_app_pid = active_app_pid
self.active_app_to_use = active_app_to_use
self.logger = logger
self.frontmost_app = None
def __enter__(self):
from AppKit import NSWorkspace
if self.active_app_pid:
if self.logger and self.active_app_to_use:
self.logger.debug(
f"Automatically activating app '{self.active_app_to_use}' for screenshot composition"
)
self.frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
running_apps_list = NSWorkspace.sharedWorkspace().runningApplications()
for app in running_apps_list:
if app.processIdentifier() == self.active_app_pid:
app.activateWithOptions_(0)
# sleep for 0.5 seconds
time.sleep(0.5)
break
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.frontmost_app:
# sleep for 0.5 seconds
time.sleep(0.5)
self.frontmost_app.activateWithOptions_(0)
def get_frontmost_and_active_app(all_windows, running_apps, app_whitelist):
from AppKit import NSWorkspace
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
active_app_to_use = None
active_app_pid = None
# Find the topmost (highest z_index) non-filtered app
for window in reversed(all_windows):
owner = window.get("owner")
role = window.get("role")
is_on_screen = window.get("is_on_screen")
# Skip non-app windows
if role != "app":
continue
# Skip not-on-screen windows
if not is_on_screen:
continue
# Skip filtered apps
if app_whitelist is not None and owner not in app_whitelist:
continue
# Found a suitable app
active_app_to_use = owner
active_app_pid = window.get("pid")
break
# If no suitable app found, use Finder
if active_app_to_use is None:
active_app_to_use = "Finder"
for app in running_apps:
if app.localizedName() == "Finder":
active_app_pid = app.processIdentifier()
break
return frontmost_app, active_app_to_use, active_app_pid
def capture_all_apps(
save_to_disk: bool = False,
app_whitelist: List[str] = None,
output_dir: str = None,
take_focus: bool = True,
) -> Tuple[Dict[str, Any], Optional[Image.Image]]:
"""Capture screenshots of all running applications
Args:
save_to_disk: Whether to save screenshots to disk
app_whitelist: Optional list of app names to include in the recomposited screenshot
(will always include 'Window Server' and 'Dock')
Returns:
Dictionary with application information and screenshots
Optional PIL Image of the recomposited screenshot
"""
result = {
"timestamp": time.time(),
"applications": [],
"windows": [], # New array to store all windows, including those without apps
"menubar_items": [], # New array to store menubar items
"dock_items": [], # New array to store dock items
}
# Get all windows with z-order information
all_windows = get_all_windows()
# Get all running applications
running_apps = get_running_apps()
frontmost_app, active_app_to_use, active_app_pid = (
get_frontmost_and_active_app(all_windows, running_apps, app_whitelist)
if take_focus
else (None, None, None)
)
# Use AppActivationContext to activate the app and restore focus
with AppActivationContext(active_app_pid, active_app_to_use, logger):
# Process applications
for app in running_apps:
# Skip system apps without a bundle ID
if app.bundleIdentifier() is None:
continue
app_info = get_app_info(app)
app_windows = get_app_windows(app.processIdentifier(), all_windows)
app_data = {"info": app_info, "windows": [window["id"] for window in app_windows]}
result["applications"].append(app_data)
# Add all windows to the result
result["windows"] = all_windows
# Get menubar items from the active application
menubar_items = get_menubar_items(active_app_pid)
result["menubar_items"] = menubar_items
# Get dock items
dock_items = get_dock_items()
result["dock_items"] = dock_items
# Get menubar bounds
menubar_bounds = get_menubar_bounds()
result["menubar_bounds"] = menubar_bounds
# Get dock bounds
dock_bounds = get_dock_bounds()
result["dock_bounds"] = dock_bounds
# Capture the entire desktop using Quartz compositing
desktop_screenshot, hitboxes = draw_desktop_screenshot(
app_whitelist, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items
)
result["hitboxes"] = hitboxes
from PIL import Image, ImageChops, ImageDraw
def _draw_hitboxes(img, hitboxes, key="target"):
"""
Overlay opaque colored rectangles for each hitbox (using hitbox[key])
with color depending on index, then multiply overlay onto img.
Args:
img: PIL.Image (RGBA or RGB)
hitboxes: list of dicts with 'hitbox' and 'target' keys
key: 'hitbox' or 'target'
Returns:
PIL.Image with overlayed hitboxes (same mode/size as input)
"""
# Ensure RGBA mode for blending
base = img.convert("RGBA")
overlay = Image.new("RGBA", base.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
# Distinct colors for order
colors = [
(255, 0, 0, 180), # Red
(0, 255, 0, 180), # Green
(0, 0, 255, 180), # Blue
(255, 255, 0, 180), # Yellow
(0, 255, 255, 180), # Cyan
(255, 0, 255, 180), # Magenta
(255, 128, 0, 180), # Orange
(128, 0, 255, 180), # Purple
(0, 128, 255, 180), # Sky blue
(128, 255, 0, 180), # Lime
]
# Set minimum brightness for colors
min_brightness = 0
colors = [
(
max(min_brightness, c[0]),
max(min_brightness, c[1]),
max(min_brightness, c[2]),
c[3],
)
for c in colors
]
for i, h in enumerate(hitboxes):
rect = h.get(key)
color = colors[i % len(colors)]
if rect:
draw.rectangle(rect, fill=color)
# Multiply blend overlay onto base
result = ImageChops.multiply(base, overlay)
return result
# DEBUG: Save hitboxes to disk
if desktop_screenshot and save_to_disk and output_dir:
desktop_path = os.path.join(output_dir, "desktop.png")
desktop_screenshot.save(desktop_path)
result["desktop_screenshot"] = desktop_path
logger.info(f"Saved desktop screenshot to {desktop_path}")
if app_whitelist:
# Take screenshot without whitelist
desktop_screenshot_full, hitboxes_full = draw_desktop_screenshot(
None, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items
)
# Draw hitboxes on both images using overlay
img1 = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox")
img2 = (
_draw_hitboxes(desktop_screenshot_full.copy(), hitboxes, key="target")
if desktop_screenshot_full
else None
)
if img2 and hitboxes_full:
# Compose side-by-side
from PIL import Image
width = img1.width + img2.width
height = max(img1.height, img2.height)
combined = Image.new("RGBA", (width, height), (0, 0, 0, 0))
combined.paste(img1, (0, 0))
combined.paste(img2, (img1.width, 0))
side_by_side_path = os.path.join(output_dir, "side_by_side_hitboxes.png")
combined.save(side_by_side_path)
result["side_by_side_hitboxes"] = side_by_side_path
else:
# Overlay hitboxes using new function
hitbox_img = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox")
hitbox_path = os.path.join(output_dir, "hitboxes.png")
hitbox_img.save(hitbox_path)
result["hitbox_screenshot"] = hitbox_path
# Focus restoration is now handled by AppActivationContext
return result, desktop_screenshot
async def run_capture():
"""Run the screenshot capture asynchronously"""
# Parse command line arguments
parser = argparse.ArgumentParser(
description="Capture screenshots of running macOS applications"
)
parser.add_argument(
"--output", "-o", help="Output directory for screenshots", default="app_screenshots"
)
parser.add_argument(
"--filter",
"-f",
nargs="+",
help="Filter recomposited screenshot to only include specified apps",
)
parser.add_argument(
"--menubar",
"-m",
action="store_true",
help="List menubar and status items with their bounding boxes",
)
parser.add_argument(
"--dock", "-d", action="store_true", help="List Dock items with their bounding boxes"
)
parser.add_argument(
"--demo",
nargs="*",
help="Demo mode: pass app names to capture individual and combinations, create mosaic PNG",
)
args = parser.parse_args()
# Create output directory in the current directory if not absolute
if not os.path.isabs(args.output):
output_dir = os.path.join(os.getcwd(), args.output)
else:
output_dir = args.output
# DEMO MODE: capture each app and all non-empty combinations, then mosaic
if args.demo:
from PIL import Image
demo_apps = args.demo
print(f"Running in DEMO mode for apps: {demo_apps}")
groups = []
for item in demo_apps:
if "/" in item:
group = [x.strip() for x in item.split("/") if x.strip()]
else:
group = [item.strip()]
if group:
groups.append(group)
screenshots = []
for group in groups:
print(f"Capturing for apps: {group}")
_, img = capture_all_apps(app_whitelist=group)
if img:
screenshots.append((group, img))
if not screenshots:
print("No screenshots captured in demo mode.")
return
# Mosaic-pack: grid (rows of sqrt(N))
def make_mosaic(images, pad=64, bg=(30, 30, 30)):
import rpack
sizes = [(img.width + pad, img.height + pad) for _, img in images]
positions = rpack.pack(sizes)
# Find the bounding box for the mosaic
max_x = max(x + w for (x, y), (w, h) in zip(positions, sizes))
max_y = max(y + h for (x, y), (w, h) in zip(positions, sizes))
mosaic = Image.new("RGBA", (max_x, max_y), bg)
for (group, img), (x, y) in zip(images, positions):
mosaic.paste(img, (x, y))
return mosaic
mosaic_img = make_mosaic(screenshots)
mosaic_path = os.path.join(output_dir, "demo_mosaic.png")
os.makedirs(output_dir, exist_ok=True)
mosaic_img.save(mosaic_path)
print(f"Demo mosaic saved to: {mosaic_path}")
return
# Capture all apps and save to disk, including a recomposited screenshot
print("Capturing screenshots of all running applications...")
print(f"Saving screenshots to: {output_dir}")
# If filter is provided, show what we're filtering by
if args.filter:
print(
f"Filtering recomposited screenshot to only include: {', '.join(args.filter)} (plus Window Server and Dock)"
)
result, img = capture_all_apps(
save_to_disk=True, app_whitelist=args.filter, output_dir=output_dir, take_focus=True
)
# Print summary
print("\nCapture complete!")
print(f"Captured {len(result['applications'])} applications")
total_app_windows = sum(len(app["windows"]) for app in result["applications"])
print(f"Total application windows captured: {total_app_windows}")
print(f"Total standalone windows captured: {len(result['windows'])}")
# Print details of each application
print("\nApplication details:")
for app in result["applications"]:
app_info = app["info"]
windows = app["windows"]
print(f" - {app_info['name']} ({len(windows)} windows)")
# Print recomposited screenshot path if available
if "desktop_screenshot" in result:
print(f"\nRecomposited screenshot saved to: {result['desktop_screenshot']}")
# Print menubar items if requested
if args.menubar and "menubar_items" in result:
print("\nMenubar items:")
# Find app name for the PID
app_name_by_pid = {}
for app in result["applications"]:
app_info = app["info"]
app_name_by_pid[app_info["pid"]] = app_info["name"]
for item in result["menubar_items"]:
print(f" - {item['title']}")
print(
f" Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}"
)
if "app_pid" in item:
app_name = app_name_by_pid.get(
item["app_pid"], f"Unknown App (PID: {item['app_pid']})"
)
print(f" App: {app_name} (PID: {item['app_pid']})")
if "window_id" in item:
print(f" Window ID: {item['window_id']}")
if "owner" in item:
print(f" Owner: {item['owner']}")
if "layer" in item and "z_index" in item:
print(f" Layer: {item['layer']}, Z-Index: {item['z_index']}")
print("")
# Print dock items if requested
if args.dock and "dock_items" in result:
print("\nDock items:")
for item in result["dock_items"]:
print(f" - {item['title']} ({item['type']})")
print(f" Description: {item['description']}")
print(
f" Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}"
)
print(f" Role: {item['role']}, Subrole: {item['subrole']}")
print(f" Index: {item['index']}")
print("")
# Save the metadata to a JSON file
metadata_path = os.path.join(output_dir, "metadata.json")
with open(metadata_path, "w") as f:
json.dump(result, f, indent=2)
print(f"\nMetadata saved to: {metadata_path}")
if __name__ == "__main__":
asyncio.run(run_capture())
```
--------------------------------------------------------------------------------
/libs/python/computer-server/computer_server/handlers/macos.py:
--------------------------------------------------------------------------------
```python
import pyautogui
pyautogui.FAILSAFE = False
import asyncio
import base64
import copy
import json
import logging
import re
import time
from ctypes import POINTER, byref, c_void_p
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple
import AppKit
import Foundation
import objc
from AppKit import NSWorkspace # type: ignore
from ApplicationServices import AXUIElementCopyAttributeValue # type: ignore
from ApplicationServices import AXUIElementCopyAttributeValues # type: ignore
from ApplicationServices import AXUIElementCreateApplication # type: ignore
from ApplicationServices import AXUIElementCreateSystemWide # type: ignore
from ApplicationServices import AXUIElementGetTypeID # type: ignore
from ApplicationServices import AXValueGetType # type: ignore
from ApplicationServices import AXValueGetValue # type: ignore
from ApplicationServices import kAXChildrenAttribute # type: ignore
from ApplicationServices import kAXDescriptionAttribute # type: ignore
from ApplicationServices import kAXEnabledAttribute # type: ignore
from ApplicationServices import kAXErrorSuccess # type: ignore
from ApplicationServices import kAXFocusedApplicationAttribute # type: ignore
from ApplicationServices import kAXFocusedUIElementAttribute # type: ignore
from ApplicationServices import kAXFocusedWindowAttribute # type: ignore
from ApplicationServices import kAXMainWindowAttribute # type: ignore
from ApplicationServices import kAXPositionAttribute # type: ignore
from ApplicationServices import kAXRoleAttribute # type: ignore
from ApplicationServices import kAXRoleDescriptionAttribute # type: ignore
from ApplicationServices import kAXSelectedTextAttribute # type: ignore
from ApplicationServices import kAXSelectedTextRangeAttribute # type: ignore
from ApplicationServices import kAXSizeAttribute # type: ignore
from ApplicationServices import kAXTitleAttribute # type: ignore
from ApplicationServices import kAXValueAttribute # type: ignore
from ApplicationServices import kAXValueCFRangeType # type: ignore
from ApplicationServices import kAXValueCGPointType # type: ignore
from ApplicationServices import kAXValueCGSizeType # type: ignore
from ApplicationServices import kAXVisibleChildrenAttribute # type: ignore
from ApplicationServices import kAXWindowsAttribute # type: ignore
from pynput.keyboard import Controller as KeyboardController
from pynput.keyboard import Key
from pynput.mouse import Button
from pynput.mouse import Controller as MouseController
from Quartz.CoreGraphics import * # type: ignore
from Quartz.CoreGraphics import CGPoint, CGSize # type: ignore
from .base import BaseAccessibilityHandler, BaseAutomationHandler
logger = logging.getLogger(__name__)
# Trigger accessibility permissions prompt on macOS
try:
# Source - https://stackoverflow.com/a/17134
# Posted by Andreas
# Retrieved 2025-12-03, License - CC BY-SA 4.0
# Attempt to create and post a mouse event to trigger the permissions prompt
# This will cause macOS to show "Python would like to control this computer using accessibility features"
current_pos = CGEventGetLocation(CGEventCreate(None))
p = CGPoint()
p.x = current_pos.x
p.y = current_pos.y
me = CGEventCreateMouseEvent(None, kCGEventMouseMoved, p, 0)
if me:
CGEventPost(kCGHIDEventTap, me)
CFRelease(me)
except Exception as e:
logger.debug(f"Failed to trigger accessibility permissions prompt: {e}")
# Trigger screen recording prompt on macOS
try:
import pyautogui
pyautogui.screenshot()
except Exception as e:
logger.debug(f"Failed to trigger screenshot permissions prompt: {e}")
# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXTitleAttribute = "AXTitle"
kAXValueAttribute = "AXValue"
kAXWindowsAttribute = "AXWindows"
kAXFocusedAttribute = "AXFocused"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
kAXMenuBarItemAttribute = "AXMenuBarItem"
# Constants for window properties
kCGWindowLayer = "kCGWindowLayer" # Z-order information (lower values are higher in the stack)
kCGWindowAlpha = "kCGWindowAlpha" # Window opacity
# Constants for application activation options
NSApplicationActivationOptions = {
"regular": 0, # Default activation
"bringing_all_windows_forward": 1 << 0, # NSApplicationActivateAllWindows
"ignoring_other_apps": 1 << 1, # NSApplicationActivateIgnoringOtherApps
}
def CFAttributeToPyObject(attrValue):
"""Convert Core Foundation attribute values to Python objects.
Args:
attrValue: Core Foundation attribute value to convert
Returns:
Converted Python object or None if conversion fails
"""
def list_helper(list_value):
"""Helper function to convert CF arrays to Python lists.
Args:
list_value: Core Foundation array to convert
Returns:
Python list containing converted items
"""
list_builder = []
for item in list_value:
list_builder.append(CFAttributeToPyObject(item))
return list_builder
def number_helper(number_value):
"""Helper function to convert CF numbers to Python numbers.
Args:
number_value: Core Foundation number to convert
Returns:
Python int or float, or None if conversion fails
"""
success, int_value = Foundation.CFNumberGetValue( # type: ignore
number_value, Foundation.kCFNumberIntType, None # type: ignore
)
if success:
return int(int_value)
success, float_value = Foundation.CFNumberGetValue( # type: ignore
number_value, Foundation.kCFNumberDoubleType, None # type: ignore
)
if success:
return float(float_value)
return None
def axuielement_helper(element_value):
"""Helper function to handle AX UI elements.
Args:
element_value: Accessibility UI element to process
Returns:
The element value unchanged
"""
return element_value
cf_attr_type = Foundation.CFGetTypeID(attrValue) # type: ignore
cf_type_mapping = {
Foundation.CFStringGetTypeID(): str, # type: ignore
Foundation.CFBooleanGetTypeID(): bool, # type: ignore
Foundation.CFArrayGetTypeID(): list_helper, # type: ignore
Foundation.CFNumberGetTypeID(): number_helper, # type: ignore
AXUIElementGetTypeID(): axuielement_helper, # type: ignore
}
try:
return cf_type_mapping[cf_attr_type](attrValue)
except KeyError:
# did not get a supported CF type. Move on to AX type
pass
ax_attr_type = AXValueGetType(attrValue)
ax_type_map = {
kAXValueCGSizeType: Foundation.NSSizeFromString, # type: ignore
kAXValueCGPointType: Foundation.NSPointFromString, # type: ignore
kAXValueCFRangeType: Foundation.NSRangeFromString, # type: ignore
}
try:
search_result = re.search("{.*}", attrValue.description())
if search_result:
extracted_str = search_result.group()
return tuple(ax_type_map[ax_attr_type](extracted_str))
return None
except KeyError:
return None
def element_attribute(element, attribute):
"""Get an attribute value from an accessibility element.
Args:
element: The accessibility element
attribute: The attribute name to retrieve
Returns:
The attribute value or None if not found
"""
if attribute == kAXChildrenAttribute:
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
if err == kAXErrorSuccess:
if isinstance(value, Foundation.NSArray): # type: ignore
return CFAttributeToPyObject(value)
else:
return value
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
if err == kAXErrorSuccess:
if isinstance(value, Foundation.NSArray): # type: ignore
return CFAttributeToPyObject(value)
else:
return value
return None
def element_value(element, type):
"""Extract a typed value from an accessibility element.
Args:
element: The accessibility element containing the value
type: The expected value type
Returns:
The extracted value or None if extraction fails
"""
err, value = AXValueGetValue(element, type, None)
if err == True:
return value
return None
class UIElement:
"""Represents a UI element in the accessibility tree with position, size, and hierarchy information."""
def __init__(self, element, offset_x=0, offset_y=0, max_depth=None, parents_visible_bbox=None):
"""Initialize a UIElement from an accessibility element.
Args:
element: The accessibility element to wrap
offset_x: X offset for position calculations
offset_y: Y offset for position calculations
max_depth: Maximum depth to traverse for children
parents_visible_bbox: Parent's visible bounding box for clipping
"""
self.ax_element = element
self.content_identifier = ""
self.identifier = ""
self.name = ""
self.children = []
self.description = ""
self.role_description = ""
self.value = None
self.max_depth = max_depth
# Set role
self.role = element_attribute(element, kAXRoleAttribute)
if self.role is None:
self.role = "No role"
# Set name
self.name = element_attribute(element, kAXTitleAttribute)
if self.name is not None:
# Convert tuple to string if needed
if isinstance(self.name, tuple):
self.name = str(self.name[0]) if self.name else ""
self.name = self.name.replace(" ", "_")
# Set enabled
self.enabled = element_attribute(element, kAXEnabledAttribute)
if self.enabled is None:
self.enabled = False
# Set position and size
position = element_attribute(element, kAXPositionAttribute)
size = element_attribute(element, kAXSizeAttribute)
start_position = element_value(position, kAXValueCGPointType)
if self.role == "AXWindow" and start_position is not None:
offset_x = start_position.x
offset_y = start_position.y
self.absolute_position = copy.copy(start_position)
self.position = start_position
if self.position is not None:
self.position.x -= max(0, offset_x)
self.position.y -= max(0, offset_y)
self.size = element_value(size, kAXValueCGSizeType)
self._set_bboxes(parents_visible_bbox)
# Set component center
if start_position is None or self.size is None:
print("Position is None")
return
self.center = (
start_position.x + offset_x + self.size.width / 2,
start_position.y + offset_y + self.size.height / 2,
)
self.description = element_attribute(element, kAXDescriptionAttribute)
self.role_description = element_attribute(element, kAXRoleDescriptionAttribute)
attribute_value = element_attribute(element, kAXValueAttribute)
# Set value
self.value = attribute_value
if attribute_value is not None:
if isinstance(attribute_value, Foundation.NSArray): # type: ignore
self.value = []
for value in attribute_value:
self.value.append(value)
# Check if it's an accessibility element by checking its type ID
elif Foundation.CFGetTypeID(attribute_value) == AXUIElementGetTypeID(): # type: ignore
self.value = UIElement(attribute_value, offset_x, offset_y)
# Set children
if self.max_depth is None or self.max_depth > 0:
self.children = self._get_children(element, start_position, offset_x, offset_y)
else:
self.children = []
self.calculate_hashes()
def _set_bboxes(self, parents_visible_bbox):
"""Set bounding box and visible bounding box for the element.
Args:
parents_visible_bbox: Parent's visible bounding box for intersection calculation
"""
if not self.absolute_position or not self.size:
self.bbox = None
self.visible_bbox = None
return
self.bbox = [
int(self.absolute_position.x),
int(self.absolute_position.y),
int(self.absolute_position.x + self.size.width),
int(self.absolute_position.y + self.size.height),
]
if parents_visible_bbox:
# check if not intersected
if (
self.bbox[0] > parents_visible_bbox[2]
or self.bbox[1] > parents_visible_bbox[3]
or self.bbox[2] < parents_visible_bbox[0]
or self.bbox[3] < parents_visible_bbox[1]
):
self.visible_bbox = None
else:
self.visible_bbox = [
int(max(self.bbox[0], parents_visible_bbox[0])),
int(max(self.bbox[1], parents_visible_bbox[1])),
int(min(self.bbox[2], parents_visible_bbox[2])),
int(min(self.bbox[3], parents_visible_bbox[3])),
]
else:
self.visible_bbox = self.bbox
def _get_children(self, element, start_position, offset_x, offset_y):
"""Get child elements from the accessibility element.
Args:
element: The parent accessibility element
start_position: Starting position for offset calculations
offset_x: X offset for child positioning
offset_y: Y offset for child positioning
Returns:
List of UIElement children
"""
children = element_attribute(element, kAXChildrenAttribute)
visible_children = element_attribute(element, kAXVisibleChildrenAttribute)
found_children = []
if children is not None:
found_children.extend(children)
else:
if visible_children is not None:
found_children.extend(visible_children)
result = []
if self.max_depth is None or self.max_depth > 0:
for child in found_children:
child = UIElement(
child,
offset_x,
offset_y,
self.max_depth - 1 if self.max_depth is not None else None,
self.visible_bbox,
)
result.append(child)
return result
def calculate_hashes(self):
"""Calculate unique identifiers for the element and its content."""
self.identifier = self.component_hash()
self.content_identifier = self.children_content_hash(self.children)
def component_hash(self):
"""Generate a hash identifier for this component based on its properties.
Returns:
MD5 hash string of component properties
"""
if self.position is None or self.size is None:
return ""
position_string = f"{self.position.x:.0f};{self.position.y:.0f}"
size_string = f"{self.size.width:.0f};{self.size.height:.0f}"
enabled_string = str(self.enabled)
# Ensure role is a string
role_string = ""
if self.role is not None:
role_string = str(self.role[0]) if isinstance(self.role, tuple) else str(self.role)
return self.hash_from_string(position_string + size_string + enabled_string + role_string)
def hash_from_string(self, string):
"""Generate MD5 hash from a string.
Args:
string: Input string to hash
Returns:
MD5 hash hexdigest or empty string if input is None/empty
"""
if string is None or string == "":
return ""
from hashlib import md5
return md5(string.encode()).hexdigest()
def children_content_hash(self, children):
"""Generate a hash representing the content and structure of child elements.
Args:
children: List of child UIElement objects
Returns:
Combined hash of children content and structure
"""
if len(children) == 0:
return ""
all_content_hashes = []
all_hashes = []
for child in children:
all_content_hashes.append(child.content_identifier)
all_hashes.append(child.identifier)
all_content_hashes.sort()
if len(all_content_hashes) == 0:
return ""
content_hash = self.hash_from_string("".join(all_content_hashes))
content_structure_hash = self.hash_from_string("".join(all_hashes))
return self.hash_from_string(content_hash.join(content_structure_hash))
def to_dict(self):
"""Convert the UIElement to a dictionary representation.
Returns:
Dictionary containing all element properties and children
"""
def children_to_dict(children):
"""Convert list of children to dictionary format.
Args:
children: List of UIElement children to convert
Returns:
List of dictionaries representing the children
"""
result = []
for child in children:
result.append(child.to_dict())
return result
value = self.value
if isinstance(value, UIElement):
value = json.dumps(value.to_dict(), indent=4)
elif isinstance(value, AppKit.NSDate): # type: ignore
value = str(value)
if self.absolute_position is not None:
absolute_position = f"{self.absolute_position.x:.2f};{self.absolute_position.y:.2f}"
else:
absolute_position = ""
if self.position is not None:
position = f"{self.position.x:.2f};{self.position.y:.2f}"
else:
position = ""
if self.size is not None:
size = f"{self.size.width:.0f};{self.size.height:.0f}"
else:
size = ""
return {
"id": self.identifier,
"name": self.name,
"role": self.role,
"description": self.description,
"role_description": self.role_description,
"value": value,
"absolute_position": absolute_position,
"position": position,
"size": size,
"enabled": self.enabled,
"bbox": self.bbox,
"visible_bbox": self.visible_bbox,
"children": children_to_dict(self.children),
}
from pathlib import Path
import Quartz
from AppKit import NSRunningApplication, NSWorkspace
def get_all_windows_zorder():
"""Get all windows in the system with their z-order information.
Returns:
List of window dictionaries sorted by z-index, containing window properties
like id, name, pid, owner, bounds, layer, and opacity
"""
window_list = Quartz.CGWindowListCopyWindowInfo(
Quartz.kCGWindowListOptionOnScreenOnly, Quartz.kCGNullWindowID
)
z_order = {
window["kCGWindowNumber"]: z_index for z_index, window in enumerate(window_list[::-1])
}
window_list_all = Quartz.CGWindowListCopyWindowInfo(
Quartz.kCGWindowListOptionAll, Quartz.kCGNullWindowID
)
windows = []
for window in window_list_all:
window_id = window.get("kCGWindowNumber", 0)
window_name = window.get("kCGWindowName", "")
window_pid = window.get("kCGWindowOwnerPID", 0)
window_bounds = window.get("kCGWindowBounds", {})
window_owner = window.get("kCGWindowOwnerName", "")
window_is_on_screen = window.get("kCGWindowIsOnscreen", False)
layer = window.get("kCGWindowLayer", 0)
opacity = window.get("kCGWindowAlpha", 1.0)
z_index = z_order.get(window_id, -1)
if window_name == "Dock" and window_owner == "Dock":
role = "dock"
elif window_name == "Menubar" and window_owner == "Window Server":
role = "menubar"
elif window_owner in ["Window Server", "Dock"]:
role = "desktop"
else:
role = "app"
if window_bounds:
windows.append(
{
"id": window_id,
"name": window_name or "Unnamed Window",
"pid": window_pid,
"owner": window_owner,
"role": role,
"is_on_screen": window_is_on_screen,
"bounds": {
"x": window_bounds.get("X", 0),
"y": window_bounds.get("Y", 0),
"width": window_bounds.get("Width", 0),
"height": window_bounds.get("Height", 0),
},
"layer": layer,
"z_index": z_index,
"opacity": opacity,
}
)
windows = sorted(windows, key=lambda x: x["z_index"])
return windows
def get_app_info(app):
"""Extract information from an NSRunningApplication object.
Args:
app: NSRunningApplication instance
Returns:
Dictionary containing app name, bundle ID, PID, and status flags
"""
return {
"name": app.localizedName(),
"bundle_id": app.bundleIdentifier(),
"pid": app.processIdentifier(),
"active": app.isActive(),
"hidden": app.isHidden(),
"terminated": app.isTerminated(),
}
def get_menubar_items(active_app_pid=None):
"""Get menubar items for the active application.
Args:
active_app_pid: Process ID of the active application, or None to use frontmost app
Returns:
List of menubar item dictionaries with title, bounds, index, and app_pid
"""
menubar_items = []
if active_app_pid is None:
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
if frontmost_app:
active_app_pid = frontmost_app.processIdentifier()
else:
return menubar_items
app_element = AXUIElementCreateApplication(active_app_pid)
if app_element is None:
return menubar_items
menubar = element_attribute(app_element, kAXMenuBarAttribute)
if menubar is None:
return menubar_items
children = element_attribute(menubar, kAXChildrenAttribute)
if children is None:
return menubar_items
for i, item in enumerate(children):
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
position_value = element_attribute(item, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
bounds["x"] = getattr(position_value, "x", 0)
bounds["y"] = getattr(position_value, "y", 0)
size_value = element_attribute(item, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
bounds["width"] = getattr(size_value, "width", 0)
bounds["height"] = getattr(size_value, "height", 0)
menubar_items.append(
{"title": title, "bounds": bounds, "index": i, "app_pid": active_app_pid}
)
return menubar_items
def get_dock_items():
"""Get all items in the macOS Dock.
Returns:
List of dock item dictionaries with title, description, bounds, index,
type, role, and subrole information
"""
dock_items = []
dock_pid = None
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
for app in running_apps:
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
dock_pid = app.processIdentifier()
break
if dock_pid is None:
return dock_items
dock_element = AXUIElementCreateApplication(dock_pid)
if dock_element is None:
return dock_items
dock_list = element_attribute(dock_element, kAXChildrenAttribute)
if dock_list is None or len(dock_list) == 0:
return dock_items
dock_app_list = None
for child in dock_list:
role = element_attribute(child, kAXRoleAttribute)
if role == "AXList":
dock_app_list = child
break
if dock_app_list is None:
return dock_items
items = element_attribute(dock_app_list, kAXChildrenAttribute)
if items is None:
return dock_items
for i, item in enumerate(items):
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
description = element_attribute(item, kAXDescriptionAttribute) or ""
role = element_attribute(item, kAXRoleAttribute) or ""
subrole = element_attribute(item, "AXSubrole") or ""
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
position_value = element_attribute(item, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
bounds["x"] = getattr(position_value, "x", 0)
bounds["y"] = getattr(position_value, "y", 0)
size_value = element_attribute(item, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
bounds["width"] = getattr(size_value, "width", 0)
bounds["height"] = getattr(size_value, "height", 0)
item_type = "unknown"
if subrole == "AXApplicationDockItem":
item_type = "application"
elif subrole == "AXFolderDockItem":
item_type = "folder"
elif subrole == "AXDocumentDockItem":
item_type = "document"
elif subrole == "AXSeparatorDockItem" or role == "AXSeparator":
item_type = "separator"
elif "trash" in title.lower():
item_type = "trash"
dock_items.append(
{
"title": title,
"description": description,
"bounds": bounds,
"index": i,
"type": item_type,
"role": role,
"subrole": subrole,
}
)
return dock_items
class MacOSAccessibilityHandler(BaseAccessibilityHandler):
"""Handler for macOS accessibility features and UI element inspection."""
def get_desktop_state(self):
"""Get the current state of the desktop including windows, apps, menubar, and dock.
Returns:
Dictionary containing applications, windows, menubar_items, and dock_items
"""
windows = [w for w in get_all_windows_zorder() if w.get("is_on_screen")]
running_apps = self.get_running_apps()
applications = []
pid_to_window_ids = {}
# Build a mapping: pid -> list of AX window trees
pid_to_ax_trees = {}
for app in running_apps:
pid = app.processIdentifier()
try:
app_elem = AXUIElementCreateApplication(pid)
err, app_windows = AXUIElementCopyAttributeValue(
app_elem, kAXWindowsAttribute, None
)
trees = []
if err == kAXErrorSuccess and app_windows:
for ax_win in app_windows:
try:
trees.append(UIElement(ax_win).to_dict())
except Exception as e:
trees.append({"error": str(e)})
pid_to_ax_trees[pid] = trees
except Exception as e:
pid_to_ax_trees[pid] = [{"error": str(e)}]
# Attach children by pid and index (order)
pid_to_idx = {}
for win in windows:
pid = win["pid"]
idx = pid_to_idx.get(pid, 0)
ax_trees = pid_to_ax_trees.get(pid, [])
win["children"] = (
ax_trees[idx]["children"]
if idx < len(ax_trees) and "children" in ax_trees[idx]
else []
)
pid_to_idx[pid] = idx + 1
pid_to_window_ids.setdefault(pid, []).append(win["id"])
for app in running_apps:
info = get_app_info(app)
app_pid = info["pid"]
applications.append({"info": info, "windows": pid_to_window_ids.get(app_pid, [])})
menubar_items = get_menubar_items()
dock_items = get_dock_items()
return {
"applications": applications,
"windows": windows,
"menubar_items": menubar_items,
"dock_items": dock_items,
}
def get_application_windows(self, pid: int):
"""Get all windows for a specific application.
Args:
pid: Process ID of the application
Returns:
List of accessibility window elements or empty list if none found
"""
try:
app = AXUIElementCreateApplication(pid)
err, windows = AXUIElementCopyAttributeValue(app, kAXWindowsAttribute, None)
if err == kAXErrorSuccess and windows:
if isinstance(windows, Foundation.NSArray): # type: ignore
return windows
return []
except:
return []
def get_all_windows(self):
"""Get all visible windows in the system.
Returns:
List of window dictionaries with app information and window details
"""
try:
windows = []
running_apps = self.get_running_apps()
for app in running_apps:
try:
app_name = app.localizedName()
pid = app.processIdentifier()
# Skip system processes and background apps
if not app.activationPolicy() == 0: # NSApplicationActivationPolicyRegular
continue
# Get application windows
app_windows = self.get_application_windows(pid)
windows.append(
{
"app_name": app_name,
"pid": pid,
"frontmost": app.isActive(),
"has_windows": len(app_windows) > 0,
"windows": app_windows,
}
)
except:
continue
return windows
except:
return []
def get_running_apps(self):
"""Get all currently running applications.
Returns:
List of NSRunningApplication objects
"""
# From NSWorkspace.runningApplications docs: https://developer.apple.com/documentation/appkit/nsworkspace/runningapplications
# "Similar to the NSRunningApplication class's properties, this property will only change when the main run loop runs in a common mode"
# So we need to run the main run loop to get the latest running applications
Foundation.CFRunLoopRunInMode(Foundation.kCFRunLoopDefaultMode, 0.1, False) # type: ignore
return NSWorkspace.sharedWorkspace().runningApplications()
def get_ax_attribute(self, element, attribute):
"""Get an accessibility attribute from an element.
Args:
element: The accessibility element
attribute: The attribute name to retrieve
Returns:
The attribute value or None if not found
"""
return element_attribute(element, attribute)
def serialize_node(self, element):
"""Create a serializable dictionary representation of an accessibility element.
Args:
element: The accessibility element to serialize
Returns:
Dictionary containing element properties like role, title, value, position, and size
"""
# Create a serializable dictionary representation of an accessibility element
result = {}
# Get basic attributes
result["role"] = self.get_ax_attribute(element, kAXRoleAttribute)
result["title"] = self.get_ax_attribute(element, kAXTitleAttribute)
result["value"] = self.get_ax_attribute(element, kAXValueAttribute)
# Get position and size if available
position = self.get_ax_attribute(element, kAXPositionAttribute)
if position:
try:
position_dict = {"x": position[0], "y": position[1]}
result["position"] = position_dict
except (IndexError, TypeError):
pass
size = self.get_ax_attribute(element, kAXSizeAttribute)
if size:
try:
size_dict = {"width": size[0], "height": size[1]}
result["size"] = size_dict
except (IndexError, TypeError):
pass
return result
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the complete accessibility tree for the current desktop state.
Returns:
Dictionary containing success status and desktop state information
"""
try:
desktop_state = self.get_desktop_state()
return {"success": True, **desktop_state}
except Exception as e:
return {"success": False, "error": str(e)}
async def find_element(
self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None
) -> Dict[str, Any]:
"""Find an accessibility element matching the specified criteria.
Args:
role: The accessibility role to match (optional)
title: The title to match (optional)
value: The value to match (optional)
Returns:
Dictionary containing success status and the found element or error message
"""
try:
system = AXUIElementCreateSystemWide()
def match_element(element):
"""Check if an element matches the search criteria.
Args:
element: The accessibility element to check
Returns:
True if element matches all specified criteria, False otherwise
"""
if role and self.get_ax_attribute(element, kAXRoleAttribute) != role:
return False
if title and self.get_ax_attribute(element, kAXTitleAttribute) != title:
return False
if value and str(self.get_ax_attribute(element, kAXValueAttribute)) != value:
return False
return True
def search_tree(element):
"""Recursively search the accessibility tree for matching elements.
Args:
element: The accessibility element to search from
Returns:
Serialized element dictionary if match found, None otherwise
"""
if match_element(element):
return self.serialize_node(element)
children = self.get_ax_attribute(element, kAXChildrenAttribute)
if children:
for child in children:
result = search_tree(child)
if result:
return result
return None
element = search_tree(system)
return {"success": True, "element": element}
except Exception as e:
return {"success": False, "error": str(e)}
class MacOSAutomationHandler(BaseAutomationHandler):
"""Handler for macOS automation including mouse, keyboard, and screen operations."""
# Mouse Actions
mouse = MouseController()
keyboard = KeyboardController()
async def mouse_down(
self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left"
) -> Dict[str, Any]:
"""Press and hold a mouse button at the specified coordinates.
Args:
x: X coordinate (optional, uses current position if None)
y: Y coordinate (optional, uses current position if None)
button: Mouse button to press ("left", "right", or "middle")
Returns:
Dictionary containing success status and error message if failed
"""
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.press(
Button.left
if button == "left"
else Button.right if button == "right" else Button.middle
)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def mouse_up(
self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left"
) -> Dict[str, Any]:
"""Release a mouse button at the specified coordinates.
Args:
x: X coordinate (optional, uses current position if None)
y: Y coordinate (optional, uses current position if None)
button: Mouse button to release ("left", "right", or "middle")
Returns:
Dictionary containing success status and error message if failed
"""
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.release(
Button.left
if button == "left"
else Button.right if button == "right" else Button.middle
)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a left mouse click at the specified coordinates.
Args:
x: X coordinate (optional, uses current position if None)
y: Y coordinate (optional, uses current position if None)
Returns:
Dictionary containing success status and error message if failed
"""
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.click(Button.left, 1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a right mouse click at the specified coordinates.
Args:
x: X coordinate (optional, uses current position if None)
y: Y coordinate (optional, uses current position if None)
Returns:
Dictionary containing success status and error message if failed
"""
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.click(Button.right, 1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def double_click(
self, x: Optional[int] = None, y: Optional[int] = None
) -> Dict[str, Any]:
"""Perform a double left mouse click at the specified coordinates.
Args:
x: X coordinate (optional, uses current position if None)
y: Y coordinate (optional, uses current position if None)
Returns:
Dictionary containing success status and error message if failed
"""
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.click(Button.left, 2)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
"""Move the mouse cursor to the specified coordinates.
Args:
x: Target X coordinate
y: Target Y coordinate
Returns:
Dictionary containing success status and error message if failed
"""
try:
self.mouse.position = (x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag_to(
self, x: int, y: int, button: str = "left", duration: float = 0.5
) -> Dict[str, Any]:
"""Drag from current position to target coordinates.
Args:
x: Target X coordinate
y: Target Y coordinate
button: Mouse button to use for dragging ("left", "right", or "middle")
duration: Duration of the drag operation in seconds
Returns:
Dictionary containing success status and error message if failed
"""
try:
btn = (
Button.left
if button == "left"
else Button.right if button == "right" else Button.middle
)
# Press
self.mouse.press(btn)
# Move with sleep to simulate drag duration
start = self.mouse.position
steps = 20
start_x, start_y = start
dx = (x - start_x) / steps
dy = (y - start_y) / steps
for i in range(steps):
self.mouse.position = (int(start_x + dx * (i + 1)), int(start_y + dy * (i + 1)))
time.sleep(duration / steps)
# Release
self.mouse.release(btn)
return {"success": True}
except Exception as e:
try:
self.mouse.release(btn)
except:
pass
return {"success": False, "error": str(e)}
async def drag(
self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5
) -> Dict[str, Any]:
"""Drag the mouse along a specified path of coordinates.
Args:
path: List of (x, y) coordinate tuples defining the drag path
button: Mouse button to use for dragging ("left", "right", or "middle")
duration: Total duration of the drag operation in seconds
Returns:
Dictionary containing success status and error message if failed
"""
try:
if not path or len(path) < 2:
return {"success": False, "error": "Path must contain at least 2 points"}
btn = (
Button.left
if button == "left"
else Button.right if button == "right" else Button.middle
)
# Move to the first point
self.mouse.position = path[0]
self.mouse.press(btn)
step_duration = duration / (len(path) - 1) if len(path) > 1 else duration
for x, y in path[1:]:
self.mouse.position = (x, y)
time.sleep(step_duration)
self.mouse.release(btn)
return {"success": True}
except Exception as e:
try:
self.mouse.release(btn)
except:
pass
return {"success": False, "error": str(e)}
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
"""Press and hold a keyboard key.
Args:
key: Key name to press (using pyautogui key names)
Returns:
Dictionary containing success status and error message if failed
"""
try:
# use pyautogui for their key names
pyautogui.keyDown(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
"""Release a keyboard key.
Args:
key: Key name to release (using pyautogui key names)
Returns:
Dictionary containing success status and error message if failed
"""
try:
# use pyautogui for their key names
pyautogui.keyUp(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
"""Type text using the keyboard with Unicode support.
Args:
text: Text string to type
Returns:
Dictionary containing success status and error message if failed
"""
try:
# use pynput for Unicode support
self.keyboard.type(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def press_key(self, key: str) -> Dict[str, Any]:
"""Press and release a keyboard key.
Args:
key: Key name to press (using pyautogui key names)
Returns:
Dictionary containing success status and error message if failed
"""
try:
# use pyautogui for their key names
pyautogui.press(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
"""Press a combination of keys simultaneously.
Args:
keys: List of key names to press together (using pyautogui key names)
Returns:
Dictionary containing success status and error message if failed
"""
try:
# use pyautogui for their key names
pyautogui.hotkey(*keys)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
"""Scroll the mouse wheel in the specified direction.
Args:
x: Horizontal scroll amount
y: Vertical scroll amount (positive for up, negative for down)
Returns:
Dictionary containing success status and error message if failed
"""
try:
self.mouse.scroll(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll down by the specified number of clicks.
Args:
clicks: Number of scroll clicks to perform
Returns:
Dictionary containing success status and error message if failed
"""
try:
self.mouse.scroll(0, -clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll up by the specified number of clicks.
Args:
clicks: Number of scroll clicks to perform
Returns:
Dictionary containing success status and error message if failed
"""
try:
self.mouse.scroll(0, clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Screen Actions
async def screenshot(self) -> Dict[str, Any]:
"""Capture a screenshot of the current screen.
Returns:
Dictionary containing success status and base64-encoded image data or error message
"""
try:
from PIL import Image
screenshot = pyautogui.screenshot()
if not isinstance(screenshot, Image.Image):
return {"success": False, "error": "Failed to capture screenshot"}
# Resize image to reduce size (max width 1920, maintain aspect ratio)
max_width = 1920
if screenshot.width > max_width:
ratio = max_width / screenshot.width
new_height = int(screenshot.height * ratio)
screenshot = screenshot.resize((max_width, new_height), Image.Resampling.LANCZOS)
buffered = BytesIO()
# Use PNG format with optimization to reduce file size
screenshot.save(buffered, format="PNG", optimize=True)
buffered.seek(0)
image_data = base64.b64encode(buffered.getvalue()).decode()
return {"success": True, "image_data": image_data}
except Exception as e:
return {"success": False, "error": f"Screenshot error: {str(e)}"}
async def get_screen_size(self) -> Dict[str, Any]:
"""Get the dimensions of the current screen.
Returns:
Dictionary containing success status and screen size or error message
"""
try:
size = pyautogui.size()
return {"success": True, "size": {"width": size.width, "height": size.height}}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_cursor_position(self) -> Dict[str, Any]:
"""Get the current position of the mouse cursor.
Returns:
Dictionary containing success status and cursor position or error message
"""
try:
x, y = self.mouse.position
return {"success": True, "position": {"x": x, "y": y}}
except Exception as e:
return {"success": False, "error": str(e)}
# Clipboard Actions
async def copy_to_clipboard(self) -> Dict[str, Any]:
"""Get the current content of the system clipboard.
Returns:
Dictionary containing success status and clipboard content or error message
"""
try:
import pyperclip
content = pyperclip.paste()
return {"success": True, "content": content}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_clipboard(self, text: str) -> Dict[str, Any]:
"""Set the content of the system clipboard.
Args:
text: Text to copy to the clipboard
Returns:
Dictionary containing success status and error message if failed
"""
try:
import pyperclip
pyperclip.copy(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def run_command(self, command: str) -> Dict[str, Any]:
"""Run a shell command and return its output.
Args:
command: Shell command to execute
Returns:
Dictionary containing success status, stdout, stderr, and return code
"""
try:
# Create subprocess
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
# Wait for the subprocess to finish
stdout, stderr = await process.communicate()
# Return decoded output
return {
"success": True,
"stdout": stdout.decode() if stdout else "",
"stderr": stderr.decode() if stderr else "",
"return_code": process.returncode,
}
except Exception as e:
return {"success": False, "error": str(e)}
```