This is page 9 of 16. Use http://codebase.md/trycua/cua?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .all-contributorsrc ├── .cursorignore ├── .devcontainer │ ├── devcontainer.json │ ├── post-install.sh │ └── README.md ├── .dockerignore ├── .gitattributes ├── .github │ ├── FUNDING.yml │ ├── scripts │ │ ├── get_pyproject_version.py │ │ └── tests │ │ ├── __init__.py │ │ ├── README.md │ │ └── test_get_pyproject_version.py │ └── workflows │ ├── ci-lume.yml │ ├── docker-publish-kasm.yml │ ├── docker-publish-xfce.yml │ ├── docker-reusable-publish.yml │ ├── npm-publish-computer.yml │ ├── npm-publish-core.yml │ ├── publish-lume.yml │ ├── pypi-publish-agent.yml │ ├── pypi-publish-computer-server.yml │ ├── pypi-publish-computer.yml │ ├── pypi-publish-core.yml │ ├── pypi-publish-mcp-server.yml │ ├── pypi-publish-pylume.yml │ ├── pypi-publish-som.yml │ ├── pypi-reusable-publish.yml │ └── test-validation-script.yml ├── .gitignore ├── .vscode │ ├── docs.code-workspace │ ├── launch.json │ ├── libs-ts.code-workspace │ ├── lume.code-workspace │ ├── lumier.code-workspace │ ├── py.code-workspace │ └── settings.json ├── blog │ ├── app-use.md │ ├── assets │ │ ├── composite-agents.png │ │ ├── docker-ubuntu-support.png │ │ ├── hack-booth.png │ │ ├── hack-closing-ceremony.jpg │ │ ├── hack-cua-ollama-hud.jpeg │ │ ├── hack-leaderboard.png │ │ ├── hack-the-north.png │ │ ├── hack-winners.jpeg │ │ ├── hack-workshop.jpeg │ │ ├── hud-agent-evals.png │ │ └── trajectory-viewer.jpeg │ ├── bringing-computer-use-to-the-web.md │ ├── build-your-own-operator-on-macos-1.md │ ├── build-your-own-operator-on-macos-2.md │ ├── composite-agents.md │ ├── cua-hackathon.md │ ├── hack-the-north.md │ ├── hud-agent-evals.md │ ├── human-in-the-loop.md │ ├── introducing-cua-cloud-containers.md │ ├── lume-to-containerization.md │ ├── sandboxed-python-execution.md │ ├── training-computer-use-models-trajectories-1.md │ ├── trajectory-viewer.md │ ├── ubuntu-docker-support.md │ └── windows-sandbox.md ├── CONTRIBUTING.md ├── Development.md ├── Dockerfile ├── docs │ ├── .gitignore │ ├── .prettierrc │ ├── content │ │ └── docs │ │ ├── agent-sdk │ │ │ ├── agent-loops.mdx │ │ │ ├── benchmarks │ │ │ │ ├── index.mdx │ │ │ │ ├── interactive.mdx │ │ │ │ ├── introduction.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── osworld-verified.mdx │ │ │ │ ├── screenspot-pro.mdx │ │ │ │ └── screenspot-v2.mdx │ │ │ ├── callbacks │ │ │ │ ├── agent-lifecycle.mdx │ │ │ │ ├── cost-saving.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── logging.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── pii-anonymization.mdx │ │ │ │ └── trajectories.mdx │ │ │ ├── chat-history.mdx │ │ │ ├── custom-computer-handlers.mdx │ │ │ ├── custom-tools.mdx │ │ │ ├── customizing-computeragent.mdx │ │ │ ├── integrations │ │ │ │ ├── hud.mdx │ │ │ │ └── meta.json │ │ │ ├── message-format.mdx │ │ │ ├── meta.json │ │ │ ├── migration-guide.mdx │ │ │ ├── prompt-caching.mdx │ │ │ ├── supported-agents │ │ │ │ ├── composed-agents.mdx │ │ │ │ ├── computer-use-agents.mdx │ │ │ │ ├── grounding-models.mdx │ │ │ │ ├── human-in-the-loop.mdx │ │ │ │ └── meta.json │ │ │ ├── supported-model-providers │ │ │ │ ├── index.mdx │ │ │ │ └── local-models.mdx │ │ │ └── usage-tracking.mdx │ │ ├── computer-sdk │ │ │ ├── cloud-vm-management.mdx │ │ │ ├── commands.mdx │ │ │ ├── computer-ui.mdx │ │ │ ├── computers.mdx │ │ │ ├── meta.json │ │ │ └── sandboxed-python.mdx │ │ ├── index.mdx │ │ ├── libraries │ │ │ ├── agent │ │ │ │ └── index.mdx │ │ │ ├── computer │ │ │ │ └── index.mdx │ │ │ ├── computer-server │ │ │ │ ├── Commands.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── REST-API.mdx │ │ │ │ └── WebSocket-API.mdx │ │ │ ├── core │ │ │ │ └── index.mdx │ │ │ ├── lume │ │ │ │ ├── cli-reference.mdx │ │ │ │ ├── faq.md │ │ │ │ ├── http-api.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── meta.json │ │ │ │ └── prebuilt-images.mdx │ │ │ ├── lumier │ │ │ │ ├── building-lumier.mdx │ │ │ │ ├── docker-compose.mdx │ │ │ │ ├── docker.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ └── meta.json │ │ │ ├── mcp-server │ │ │ │ ├── client-integrations.mdx │ │ │ │ ├── configuration.mdx │ │ │ │ ├── index.mdx │ │ │ │ ├── installation.mdx │ │ │ │ ├── llm-integrations.mdx │ │ │ │ ├── meta.json │ │ │ │ ├── tools.mdx │ │ │ │ └── usage.mdx │ │ │ └── som │ │ │ ├── configuration.mdx │ │ │ └── index.mdx │ │ ├── meta.json │ │ ├── quickstart-cli.mdx │ │ ├── quickstart-devs.mdx │ │ └── telemetry.mdx │ ├── next.config.mjs │ ├── package-lock.json │ ├── package.json │ ├── pnpm-lock.yaml │ ├── postcss.config.mjs │ ├── public │ │ └── img │ │ ├── agent_gradio_ui.png │ │ ├── agent.png │ │ ├── cli.png │ │ ├── computer.png │ │ ├── som_box_threshold.png │ │ └── som_iou_threshold.png │ ├── README.md │ ├── source.config.ts │ ├── src │ │ ├── app │ │ │ ├── (home) │ │ │ │ ├── [[...slug]] │ │ │ │ │ └── page.tsx │ │ │ │ └── layout.tsx │ │ │ ├── api │ │ │ │ └── search │ │ │ │ └── route.ts │ │ │ ├── favicon.ico │ │ │ ├── global.css │ │ │ ├── layout.config.tsx │ │ │ ├── layout.tsx │ │ │ ├── llms.mdx │ │ │ │ └── [[...slug]] │ │ │ │ └── route.ts │ │ │ └── llms.txt │ │ │ └── route.ts │ │ ├── assets │ │ │ ├── discord-black.svg │ │ │ ├── discord-white.svg │ │ │ ├── logo-black.svg │ │ │ └── logo-white.svg │ │ ├── components │ │ │ ├── iou.tsx │ │ │ └── mermaid.tsx │ │ ├── lib │ │ │ ├── llms.ts │ │ │ └── source.ts │ │ └── mdx-components.tsx │ └── tsconfig.json ├── examples │ ├── agent_examples.py │ ├── agent_ui_examples.py │ ├── cloud_api_examples.py │ ├── computer_examples_windows.py │ ├── computer_examples.py │ ├── computer_ui_examples.py │ ├── computer-example-ts │ │ ├── .env.example │ │ ├── .gitignore │ │ ├── .prettierrc │ │ ├── package-lock.json │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── README.md │ │ ├── src │ │ │ ├── helpers.ts │ │ │ └── index.ts │ │ └── tsconfig.json │ ├── docker_examples.py │ ├── evals │ │ ├── hud_eval_examples.py │ │ └── wikipedia_most_linked.txt │ ├── pylume_examples.py │ ├── sandboxed_functions_examples.py │ ├── som_examples.py │ ├── utils.py │ └── winsandbox_example.py ├── img │ ├── agent_gradio_ui.png │ ├── agent.png │ ├── cli.png │ ├── computer.png │ ├── logo_black.png │ └── logo_white.png ├── libs │ ├── kasm │ │ ├── Dockerfile │ │ ├── LICENSE │ │ ├── README.md │ │ └── src │ │ └── ubuntu │ │ └── install │ │ └── firefox │ │ ├── custom_startup.sh │ │ ├── firefox.desktop │ │ └── install_firefox.sh │ ├── lume │ │ ├── .cursorignore │ │ ├── CONTRIBUTING.md │ │ ├── Development.md │ │ ├── img │ │ │ └── cli.png │ │ ├── Package.resolved │ │ ├── Package.swift │ │ ├── README.md │ │ ├── resources │ │ │ └── lume.entitlements │ │ ├── scripts │ │ │ ├── build │ │ │ │ ├── build-debug.sh │ │ │ │ ├── build-release-notarized.sh │ │ │ │ └── build-release.sh │ │ │ └── install.sh │ │ ├── src │ │ │ ├── Commands │ │ │ │ ├── Clone.swift │ │ │ │ ├── Config.swift │ │ │ │ ├── Create.swift │ │ │ │ ├── Delete.swift │ │ │ │ ├── Get.swift │ │ │ │ ├── Images.swift │ │ │ │ ├── IPSW.swift │ │ │ │ ├── List.swift │ │ │ │ ├── Logs.swift │ │ │ │ ├── Options │ │ │ │ │ └── FormatOption.swift │ │ │ │ ├── Prune.swift │ │ │ │ ├── Pull.swift │ │ │ │ ├── Push.swift │ │ │ │ ├── Run.swift │ │ │ │ ├── Serve.swift │ │ │ │ ├── Set.swift │ │ │ │ └── Stop.swift │ │ │ ├── ContainerRegistry │ │ │ │ ├── ImageContainerRegistry.swift │ │ │ │ ├── ImageList.swift │ │ │ │ └── ImagesPrinter.swift │ │ │ ├── Errors │ │ │ │ └── Errors.swift │ │ │ ├── FileSystem │ │ │ │ ├── Home.swift │ │ │ │ ├── Settings.swift │ │ │ │ ├── VMConfig.swift │ │ │ │ ├── VMDirectory.swift │ │ │ │ └── VMLocation.swift │ │ │ ├── LumeController.swift │ │ │ ├── Main.swift │ │ │ ├── Server │ │ │ │ ├── Handlers.swift │ │ │ │ ├── HTTP.swift │ │ │ │ ├── Requests.swift │ │ │ │ ├── Responses.swift │ │ │ │ └── Server.swift │ │ │ ├── Utils │ │ │ │ ├── CommandRegistry.swift │ │ │ │ ├── CommandUtils.swift │ │ │ │ ├── Logger.swift │ │ │ │ ├── NetworkUtils.swift │ │ │ │ ├── Path.swift │ │ │ │ ├── ProcessRunner.swift │ │ │ │ ├── ProgressLogger.swift │ │ │ │ ├── String.swift │ │ │ │ └── Utils.swift │ │ │ ├── Virtualization │ │ │ │ ├── DarwinImageLoader.swift │ │ │ │ ├── DHCPLeaseParser.swift │ │ │ │ ├── ImageLoaderFactory.swift │ │ │ │ └── VMVirtualizationService.swift │ │ │ ├── VM │ │ │ │ ├── DarwinVM.swift │ │ │ │ ├── LinuxVM.swift │ │ │ │ ├── VM.swift │ │ │ │ ├── VMDetails.swift │ │ │ │ ├── VMDetailsPrinter.swift │ │ │ │ ├── VMDisplayResolution.swift │ │ │ │ └── VMFactory.swift │ │ │ └── VNC │ │ │ ├── PassphraseGenerator.swift │ │ │ └── VNCService.swift │ │ └── tests │ │ ├── Mocks │ │ │ ├── MockVM.swift │ │ │ ├── MockVMVirtualizationService.swift │ │ │ └── MockVNCService.swift │ │ ├── VM │ │ │ └── VMDetailsPrinterTests.swift │ │ ├── VMTests.swift │ │ ├── VMVirtualizationServiceTests.swift │ │ └── VNCServiceTests.swift │ ├── lumier │ │ ├── .dockerignore │ │ ├── Dockerfile │ │ ├── README.md │ │ └── src │ │ ├── bin │ │ │ └── entry.sh │ │ ├── config │ │ │ └── constants.sh │ │ ├── hooks │ │ │ └── on-logon.sh │ │ └── lib │ │ ├── utils.sh │ │ └── vm.sh │ ├── python │ │ ├── agent │ │ │ ├── .bumpversion.cfg │ │ │ ├── agent │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── adapters │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── huggingfacelocal_adapter.py │ │ │ │ │ ├── human_adapter.py │ │ │ │ │ ├── mlxvlm_adapter.py │ │ │ │ │ └── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── qwen2_5_vl.py │ │ │ │ ├── agent.py │ │ │ │ ├── callbacks │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── budget_manager.py │ │ │ │ │ ├── image_retention.py │ │ │ │ │ ├── logging.py │ │ │ │ │ ├── operator_validator.py │ │ │ │ │ ├── pii_anonymization.py │ │ │ │ │ ├── prompt_instructions.py │ │ │ │ │ ├── telemetry.py │ │ │ │ │ └── trajectory_saver.py │ │ │ │ ├── cli.py │ │ │ │ ├── computers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cua.py │ │ │ │ │ └── custom.py │ │ │ │ ├── decorators.py │ │ │ │ ├── human_tool │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ ├── server.py │ │ │ │ │ └── ui.py │ │ │ │ ├── integrations │ │ │ │ │ └── hud │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── agent.py │ │ │ │ │ └── proxy.py │ │ │ │ ├── loops │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── anthropic.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── composed_grounded.py │ │ │ │ │ ├── gemini.py │ │ │ │ │ ├── glm45v.py │ │ │ │ │ ├── gta1.py │ │ │ │ │ ├── holo.py │ │ │ │ │ ├── internvl.py │ │ │ │ │ ├── model_types.csv │ │ │ │ │ ├── moondream3.py │ │ │ │ │ ├── omniparser.py │ │ │ │ │ ├── openai.py │ │ │ │ │ ├── opencua.py │ │ │ │ │ └── uitars.py │ │ │ │ ├── proxy │ │ │ │ │ ├── examples.py │ │ │ │ │ └── handlers.py │ │ │ │ ├── responses.py │ │ │ │ ├── types.py │ │ │ │ └── ui │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ └── gradio │ │ │ │ ├── __init__.py │ │ │ │ ├── app.py │ │ │ │ └── ui_components.py │ │ │ ├── benchmarks │ │ │ │ ├── .gitignore │ │ │ │ ├── contrib.md │ │ │ │ ├── interactive.py │ │ │ │ ├── models │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ └── gta1.py │ │ │ │ ├── README.md │ │ │ │ ├── ss-pro.py │ │ │ │ ├── ss-v2.py │ │ │ │ └── utils.py │ │ │ ├── example.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer │ │ │ │ ├── __init__.py │ │ │ │ ├── computer.py │ │ │ │ ├── diorama_computer.py │ │ │ │ ├── helpers.py │ │ │ │ ├── interface │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ ├── models.py │ │ │ │ │ └── windows.py │ │ │ │ ├── logger.py │ │ │ │ ├── models.py │ │ │ │ ├── providers │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── cloud │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── docker │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── lume │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── lume_api.py │ │ │ │ │ ├── lumier │ │ │ │ │ │ ├── __init__.py │ │ │ │ │ │ └── provider.py │ │ │ │ │ ├── types.py │ │ │ │ │ └── winsandbox │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── provider.py │ │ │ │ │ └── setup_script.ps1 │ │ │ │ ├── ui │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── __main__.py │ │ │ │ │ └── gradio │ │ │ │ │ ├── __init__.py │ │ │ │ │ └── app.py │ │ │ │ └── utils.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── computer-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── computer_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── cli.py │ │ │ │ ├── diorama │ │ │ │ │ ├── __init__.py │ │ │ │ │ ├── base.py │ │ │ │ │ ├── diorama_computer.py │ │ │ │ │ ├── diorama.py │ │ │ │ │ ├── draw.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── safezone.py │ │ │ │ ├── handlers │ │ │ │ │ ├── base.py │ │ │ │ │ ├── factory.py │ │ │ │ │ ├── generic.py │ │ │ │ │ ├── linux.py │ │ │ │ │ ├── macos.py │ │ │ │ │ └── windows.py │ │ │ │ ├── main.py │ │ │ │ ├── server.py │ │ │ │ └── watchdog.py │ │ │ ├── examples │ │ │ │ ├── __init__.py │ │ │ │ └── usage_example.py │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ ├── run_server.py │ │ │ └── test_connection.py │ │ ├── core │ │ │ ├── .bumpversion.cfg │ │ │ ├── core │ │ │ │ ├── __init__.py │ │ │ │ └── telemetry │ │ │ │ ├── __init__.py │ │ │ │ └── posthog.py │ │ │ ├── poetry.toml │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ ├── mcp-server │ │ │ ├── .bumpversion.cfg │ │ │ ├── CONCURRENT_SESSIONS.md │ │ │ ├── mcp_server │ │ │ │ ├── __init__.py │ │ │ │ ├── __main__.py │ │ │ │ ├── server.py │ │ │ │ └── session_manager.py │ │ │ ├── pdm.lock │ │ │ ├── pyproject.toml │ │ │ ├── README.md │ │ │ └── scripts │ │ │ ├── install_mcp_server.sh │ │ │ └── start_mcp_server.sh │ │ ├── pylume │ │ │ ├── __init__.py │ │ │ ├── .bumpversion.cfg │ │ │ ├── pylume │ │ │ │ ├── __init__.py │ │ │ │ ├── client.py │ │ │ │ ├── exceptions.py │ │ │ │ ├── lume │ │ │ │ ├── models.py │ │ │ │ ├── pylume.py │ │ │ │ └── server.py │ │ │ ├── pyproject.toml │ │ │ └── README.md │ │ └── som │ │ ├── .bumpversion.cfg │ │ ├── LICENSE │ │ ├── poetry.toml │ │ ├── pyproject.toml │ │ ├── README.md │ │ ├── som │ │ │ ├── __init__.py │ │ │ ├── detect.py │ │ │ ├── detection.py │ │ │ ├── models.py │ │ │ ├── ocr.py │ │ │ ├── util │ │ │ │ └── utils.py │ │ │ └── visualization.py │ │ └── tests │ │ └── test_omniparser.py │ ├── typescript │ │ ├── .gitignore │ │ ├── .nvmrc │ │ ├── agent │ │ │ ├── examples │ │ │ │ ├── playground-example.html │ │ │ │ └── README.md │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── client.ts │ │ │ │ ├── index.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ └── client.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── biome.json │ │ ├── computer │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── computer │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── providers │ │ │ │ │ │ ├── base.ts │ │ │ │ │ │ ├── cloud.ts │ │ │ │ │ │ └── index.ts │ │ │ │ │ └── types.ts │ │ │ │ ├── index.ts │ │ │ │ ├── interface │ │ │ │ │ ├── base.ts │ │ │ │ │ ├── factory.ts │ │ │ │ │ ├── index.ts │ │ │ │ │ ├── linux.ts │ │ │ │ │ ├── macos.ts │ │ │ │ │ └── windows.ts │ │ │ │ └── types.ts │ │ │ ├── tests │ │ │ │ ├── computer │ │ │ │ │ └── cloud.test.ts │ │ │ │ ├── interface │ │ │ │ │ ├── factory.test.ts │ │ │ │ │ ├── index.test.ts │ │ │ │ │ ├── linux.test.ts │ │ │ │ │ ├── macos.test.ts │ │ │ │ │ └── windows.test.ts │ │ │ │ └── setup.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── core │ │ │ ├── .editorconfig │ │ │ ├── .gitattributes │ │ │ ├── .gitignore │ │ │ ├── LICENSE │ │ │ ├── package.json │ │ │ ├── README.md │ │ │ ├── src │ │ │ │ ├── index.ts │ │ │ │ └── telemetry │ │ │ │ ├── clients │ │ │ │ │ ├── index.ts │ │ │ │ │ └── posthog.ts │ │ │ │ └── index.ts │ │ │ ├── tests │ │ │ │ └── telemetry.test.ts │ │ │ ├── tsconfig.json │ │ │ ├── tsdown.config.ts │ │ │ └── vitest.config.ts │ │ ├── package.json │ │ ├── pnpm-lock.yaml │ │ ├── pnpm-workspace.yaml │ │ └── README.md │ └── xfce │ ├── .dockerignore │ ├── .gitignore │ ├── Dockerfile │ ├── README.md │ └── src │ ├── scripts │ │ ├── resize-display.sh │ │ ├── start-computer-server.sh │ │ ├── start-novnc.sh │ │ ├── start-vnc.sh │ │ └── xstartup.sh │ ├── supervisor │ │ └── supervisord.conf │ └── xfce-config │ ├── helpers.rc │ ├── xfce4-power-manager.xml │ └── xfce4-session.xml ├── LICENSE.md ├── Makefile ├── notebooks │ ├── agent_nb.ipynb │ ├── blog │ │ ├── build-your-own-operator-on-macos-1.ipynb │ │ └── build-your-own-operator-on-macos-2.ipynb │ ├── composite_agents_docker_nb.ipynb │ ├── computer_nb.ipynb │ ├── computer_server_nb.ipynb │ ├── customizing_computeragent.ipynb │ ├── eval_osworld.ipynb │ ├── ollama_nb.ipynb │ ├── pylume_nb.ipynb │ ├── README.md │ ├── sota_hackathon_cloud.ipynb │ └── sota_hackathon.ipynb ├── pdm.lock ├── pyproject.toml ├── pyrightconfig.json ├── README.md ├── samples │ └── community │ ├── global-online │ │ └── README.md │ └── hack-the-north │ └── README.md ├── scripts │ ├── build-uv.sh │ ├── build.ps1 │ ├── build.sh │ ├── cleanup.sh │ ├── playground-docker.sh │ ├── playground.sh │ └── run-docker-dev.sh └── tests ├── pytest.ini ├── shell_cmd.py ├── test_files.py ├── test_mcp_server_session_management.py ├── test_mcp_server_streaming.py ├── test_shell_bash.py ├── test_telemetry.py ├── test_venv.py └── test_watchdog.py ``` # Files -------------------------------------------------------------------------------- /blog/sandboxed-python-execution.md: -------------------------------------------------------------------------------- ```markdown # Sandboxed Python Execution: Run Code Safely in Cua Containers *Published on June 23, 2025 by Dillon DuPont* Cua's computer-use capabilities that we touched on in [Building your own Operator on macOS - Part 2](build-your-own-operator-on-macos-2.md) – your AI agents can click, scroll, type, and interact with any desktop application. But what if your agent needs to do more than just UI automation? What if it needs to process data, make API calls, analyze images, or run complex logic alongside those UI interactions, within the same virtual environment? That's where Cua's `@sandboxed` decorator comes in. While Cua handles the clicking and typing, sandboxed execution lets you run full Python code inside the same virtual environment. It's like giving your AI agents a programming brain to complement their clicking fingers. Think of it as the perfect marriage: Cua handles the "what you see" (UI interactions), while sandboxed Python handles the "what you compute" (data processing, logic, API calls) – all happening in the same isolated environment. ## So, what exactly is sandboxed execution? Cua excels at automating user interfaces – clicking buttons, filling forms, navigating applications. But modern AI agents need to do more than just UI automation. They need to process the data they collect, make intelligent decisions, call external APIs, and run sophisticated algorithms. Sandboxed execution bridges this gap. You write a Python function, decorate it with `@sandboxed`, and it runs inside your Cua container alongside your UI automation. Your agent can now click a button, extract some data, process it with Python, and then use those results to decide what to click next. Here's what makes this combination powerful for AI agent development: - **Unified environment**: Your UI automation and code execution happen in the same container - **Rich capabilities**: Combine Cua's clicking with Python's data processing, API calls, and libraries - **Seamless integration**: Pass data between UI interactions and Python functions effortlessly - **Cross-platform consistency**: Your Python code runs the same way across different Cua environments - **Complete workflows**: Build agents that can both interact with apps AND process the data they collect ## The architecture behind @sandboxed Let's jump right into an example that'll make this crystal clear: ```python from computer.helpers import sandboxed @sandboxed("demo_venv") def greet_and_print(name): """This function runs inside the container""" import PyXA # macOS-specific library safari = PyXA.Application("Safari") html = safari.current_document.source() print(f"Hello from inside the container, {name}!") return {"greeted": name, "safari_html": html} # When called, this executes in the container result = await greet_and_print("Cua") ``` What's happening here? When you call `greet_and_print()`, Cua extracts the function's source code, transmits it to the container, and executes it there. The result returns to you seamlessly, while the actual execution remains completely isolated. ## How does sandboxed execution work? Cua's sandboxed execution system employs several key architectural components: ### 1. Source Code Extraction Cua uses Python's `inspect.getsource()` to extract your function's source code and reconstruct the function definition in the remote environment. ### 2. Virtual Environment Isolation Each sandboxed function runs in a named virtual environment within the container. This provides complete dependency isolation between different functions and their respective environments. ### 3. Data Serialization and Transport Arguments and return values are serialized as JSON and transported between the host and container. This ensures compatibility across different Python versions and execution environments. ### 4. Comprehensive Error Handling The system captures both successful results and exceptions, preserving stack traces and error information for debugging purposes. ## Getting your sandbox ready Setting up sandboxed execution is simple: ```python import asyncio from computer.computer import Computer from computer.helpers import sandboxed, set_default_computer async def main(): # Fire up the computer computer = Computer() await computer.run() # Make it the default for all sandboxed functions set_default_computer(computer) # Install some packages in a virtual environment await computer.venv_install("demo_venv", ["requests", "beautifulsoup4"]) ``` If you want to get fancy, you can specify which computer instance to use: ```python @sandboxed("my_venv", computer=my_specific_computer) def my_function(): # This runs on your specified computer instance pass ``` ## Real-world examples that actually work ### Browser automation without the headaches Ever tried to automate a browser and had it crash your entire system? Yeah, us too. Here's how to do it safely: ```python @sandboxed("browser_env") def automate_browser_with_playwright(): """Automate browser interactions using Playwright""" from playwright.sync_api import sync_playwright import time import base64 from datetime import datetime try: with sync_playwright() as p: # Launch browser (visible, because why not?) browser = p.chromium.launch( headless=False, args=['--no-sandbox', '--disable-dev-shm-usage'] ) page = browser.new_page() page.set_viewport_size({"width": 1280, "height": 720}) actions = [] screenshots = {} # Let's visit example.com and poke around page.goto("https://example.com") actions.append("Navigated to example.com") # Grab a screenshot because screenshots are cool screenshot_bytes = page.screenshot(full_page=True) screenshots["initial"] = base64.b64encode(screenshot_bytes).decode() # Get some basic info title = page.title() actions.append(f"Page title: {title}") # Find links and headings try: links = page.locator("a").all() link_texts = [link.text_content() for link in links[:5]] actions.append(f"Found {len(links)} links: {link_texts}") headings = page.locator("h1, h2, h3").all() heading_texts = [h.text_content() for h in headings[:3]] actions.append(f"Found headings: {heading_texts}") except Exception as e: actions.append(f"Element interaction error: {str(e)}") # Let's try a form for good measure try: page.goto("https://httpbin.org/forms/post") actions.append("Navigated to form page") # Fill out the form page.fill('input[name="custname"]', "Test User from Sandboxed Environment") page.fill('input[name="custtel"]', "555-0123") page.fill('input[name="custemail"]', "[email protected]") page.select_option('select[name="size"]', "large") actions.append("Filled out form fields") # Submit and see what happens page.click('input[type="submit"]') page.wait_for_load_state("networkidle") actions.append("Submitted form") except Exception as e: actions.append(f"Form interaction error: {str(e)}") browser.close() return { "actions_performed": actions, "screenshots": screenshots, "success": True } except Exception as e: return {"error": f"Browser automation failed: {str(e)}"} # Install Playwright and its browsers await computer.venv_install("browser_env", ["playwright"]) await computer.venv_cmd("browser_env", "playwright install chromium") # Run the automation result = await automate_browser_with_playwright() print(f"Performed {len(result.get('actions_performed', []))} actions") ``` ### Building code analysis agents Want to build agents that can analyze code safely? Here's a security audit tool that won't accidentally `eval()` your system into oblivion: ```python @sandboxed("analysis_env") def security_audit_tool(code_snippet): """Analyze code for potential security issues""" import ast import re issues = [] # Check for the usual suspects dangerous_patterns = [ (r'eval\s*\(', "Use of eval() function"), (r'exec\s*\(', "Use of exec() function"), (r'__import__\s*\(', "Dynamic import usage"), (r'subprocess\.', "Subprocess usage"), (r'os\.system\s*\(', "OS system call"), ] for pattern, description in dangerous_patterns: if re.search(pattern, code_snippet): issues.append(description) # Get fancy with AST analysis try: tree = ast.parse(code_snippet) for node in ast.walk(tree): if isinstance(node, ast.Call): if hasattr(node.func, 'id'): if node.func.id in ['eval', 'exec', 'compile']: issues.append(f"Dangerous function call: {node.func.id}") except SyntaxError: issues.append("Syntax error in code") return { "security_issues": issues, "risk_level": "HIGH" if len(issues) > 2 else "MEDIUM" if issues else "LOW" } # Test it on some sketchy code audit_result = await security_audit_tool("eval(user_input)") print(f"Security audit: {audit_result}") ``` ### Desktop automation in the cloud Here's where things get really interesting. Cua Cloud Sandbox comes with full desktop environments, so you can automate GUIs: ```python @sandboxed("desktop_env") def take_screenshot_and_analyze(): """Take a screenshot and analyze the desktop""" import io import base64 from PIL import ImageGrab from datetime import datetime try: # Grab the screen screenshot = ImageGrab.grab() # Convert to base64 for easy transport buffer = io.BytesIO() screenshot.save(buffer, format='PNG') screenshot_data = base64.b64encode(buffer.getvalue()).decode() # Get some basic info screen_info = { "size": screenshot.size, "mode": screenshot.mode, "timestamp": datetime.now().isoformat() } # Analyze the colors (because why not?) colors = screenshot.getcolors(maxcolors=256*256*256) dominant_color = max(colors, key=lambda x: x[0])[1] if colors else None return { "screenshot_base64": screenshot_data, "screen_info": screen_info, "dominant_color": dominant_color, "unique_colors": len(colors) if colors else 0 } except Exception as e: return {"error": f"Screenshot failed: {str(e)}"} # Install the dependencies await computer.venv_install("desktop_env", ["Pillow"]) # Take and analyze a screenshot result = await take_screenshot_and_analyze() print("Desktop analysis complete!") ``` ## Pro tips for sandboxed success ### Keep it self-contained Always put your imports inside the function. Trust us on this one: ```python @sandboxed("good_env") def good_function(): import os # Import inside the function import json # Your code here return {"result": "success"} ``` ### Install dependencies first Don't forget to install packages before using them: ```python # Install first await computer.venv_install("my_env", ["pandas", "numpy", "matplotlib"]) @sandboxed("my_env") def data_analysis(): import pandas as pd import numpy as np # Now you can use them ``` ### Use descriptive environment names Future you will thank you: ```python @sandboxed("data_processing_env") def process_data(): pass @sandboxed("web_scraping_env") def scrape_site(): pass @sandboxed("ml_training_env") def train_model(): pass ``` ### Always handle errors gracefully Things break. Plan for it: ```python @sandboxed("robust_env") def robust_function(data): try: result = process_data(data) return {"success": True, "result": result} except Exception as e: return {"success": False, "error": str(e)} ``` ## What about performance? Let's be honest – there's some overhead here. Code needs to be serialized, sent over the network, and executed remotely. But for most use cases, the benefits far outweigh the costs. If you're building something performance-critical, consider: - Batching multiple operations into a single sandboxed function - Minimizing data transfer between host and container - Using persistent virtual environments ## The security angle This is where sandboxed execution really shines: 1. **Complete process isolation** – code runs in a separate container 2. **File system protection** – limited access to your host files 3. **Network isolation** – controlled network access 4. **Clean environments** – no package conflicts or pollution 5. **Resource limits** – container-level constraints keep things in check ## Ready to get started? The `@sandboxed` decorator is one of those features that sounds simple but opens up a world of possibilities. Whether you're testing sketchy code, building AI agents, or just want to keep your development environment pristine, it's got you covered. Give it a try in your next Cua project and see how liberating it feels to run code without fear! Happy coding (safely)! --- *Want to dive deeper? Check out our [sandboxed functions examples](https://github.com/trycua/cua/blob/main/examples/sandboxed_functions_examples.py) and [virtual environment tests](https://github.com/trycua/cua/blob/main/tests/venv.py) on GitHub. Questions? Come chat with us on Discord!* ``` -------------------------------------------------------------------------------- /libs/lume/src/FileSystem/Settings.swift: -------------------------------------------------------------------------------- ```swift import Foundation /// Manages the application settings using a config file struct LumeSettings: Codable, Sendable { var vmLocations: [VMLocation] var defaultLocationName: String var cacheDirectory: String var cachingEnabled: Bool var defaultLocation: VMLocation? { vmLocations.first { $0.name == defaultLocationName } } // For backward compatibility var homeDirectory: String { defaultLocation?.path ?? "~/.lume" } static let defaultSettings = LumeSettings( vmLocations: [ VMLocation(name: "default", path: "~/.lume") ], defaultLocationName: "default", cacheDirectory: "~/.lume/cache", cachingEnabled: true ) /// Gets all locations sorted by name var sortedLocations: [VMLocation] { vmLocations.sorted { $0.name < $1.name } } } final class SettingsManager: @unchecked Sendable { // MARK: - Constants private enum Constants { // Default path for config static let fallbackConfigDir = "~/.config/lume" static let configFileName = "config.yaml" } // MARK: - Properties static let shared = SettingsManager() private let fileManager: FileManager // Get the config directory following XDG spec private var configDir: String { // Check XDG_CONFIG_HOME environment variable first if let xdgConfigHome = ProcessInfo.processInfo.environment["XDG_CONFIG_HOME"] { return "\(xdgConfigHome)/lume" } // Fall back to default return (Constants.fallbackConfigDir as NSString).expandingTildeInPath } // Path to config file private var configFilePath: String { return "\(configDir)/\(Constants.configFileName)" } // MARK: - Initialization init(fileManager: FileManager = .default) { self.fileManager = fileManager ensureConfigDirectoryExists() } // MARK: - Settings Access func getSettings() -> LumeSettings { if let settings = readSettingsFromFile() { return settings } // No settings file found, use defaults let defaultSettings = LumeSettings( vmLocations: [ VMLocation(name: "default", path: "~/.lume") ], defaultLocationName: "default", cacheDirectory: "~/.lume/cache", cachingEnabled: true ) // Try to save default settings try? saveSettings(defaultSettings) return defaultSettings } func saveSettings(_ settings: LumeSettings) throws { try fileManager.createDirectory(atPath: configDir, withIntermediateDirectories: true) // Create a human-readable YAML-like configuration file var yamlContent = "# Lume Configuration\n\n" // Default location yamlContent += "defaultLocationName: \"\(settings.defaultLocationName)\"\n" // Cache directory yamlContent += "cacheDirectory: \"\(settings.cacheDirectory)\"\n" // Caching enabled flag yamlContent += "cachingEnabled: \(settings.cachingEnabled)\n" // VM locations yamlContent += "\n# VM Locations\nvmLocations:\n" for location in settings.vmLocations { yamlContent += " - name: \"\(location.name)\"\n" yamlContent += " path: \"\(location.path)\"\n" } // Write YAML content to file try yamlContent.write( to: URL(fileURLWithPath: configFilePath), atomically: true, encoding: .utf8) } // MARK: - VM Location Management func addLocation(_ location: VMLocation) throws { var settings = getSettings() // Validate location name (alphanumeric, dash, underscore) let nameRegex = try NSRegularExpression(pattern: "^[a-zA-Z0-9_-]+$") let nameRange = NSRange(location.name.startIndex..., in: location.name) if nameRegex.firstMatch(in: location.name, range: nameRange) == nil { throw VMLocationError.invalidLocationName(name: location.name) } // Check for duplicate name if settings.vmLocations.contains(where: { $0.name == location.name }) { throw VMLocationError.duplicateLocationName(name: location.name) } // Validate location path try location.validate() // Add location settings.vmLocations.append(location) try saveSettings(settings) } func removeLocation(name: String) throws { var settings = getSettings() // Check location exists guard settings.vmLocations.contains(where: { $0.name == name }) else { throw VMLocationError.locationNotFound(name: name) } // Prevent removing default location if name == settings.defaultLocationName { throw VMLocationError.defaultLocationCannotBeRemoved(name: name) } // Remove location settings.vmLocations.removeAll(where: { $0.name == name }) try saveSettings(settings) } func setDefaultLocation(name: String) throws { var settings = getSettings() // Check location exists guard settings.vmLocations.contains(where: { $0.name == name }) else { throw VMLocationError.locationNotFound(name: name) } // Set default settings.defaultLocationName = name try saveSettings(settings) } func getLocation(name: String) throws -> VMLocation { let settings = getSettings() if let location = settings.vmLocations.first(where: { $0.name == name }) { return location } throw VMLocationError.locationNotFound(name: name) } // MARK: - Legacy Home Directory Compatibility func setHomeDirectory(path: String) throws { var settings = getSettings() let defaultLocation = VMLocation(name: "default", path: path) try defaultLocation.validate() // Replace default location if let index = settings.vmLocations.firstIndex(where: { $0.name == "default" }) { settings.vmLocations[index] = defaultLocation } else { settings.vmLocations.append(defaultLocation) settings.defaultLocationName = "default" } try saveSettings(settings) } // MARK: - Cache Directory Management func setCacheDirectory(path: String) throws { var settings = getSettings() // Validate path let expandedPath = (path as NSString).expandingTildeInPath var isDir: ObjCBool = false // If directory exists, check if it's writable if fileManager.fileExists(atPath: expandedPath, isDirectory: &isDir) { if !isDir.boolValue { throw SettingsError.notADirectory(path: expandedPath) } if !fileManager.isWritableFile(atPath: expandedPath) { throw SettingsError.directoryNotWritable(path: expandedPath) } } else { // Try to create the directory do { try fileManager.createDirectory( atPath: expandedPath, withIntermediateDirectories: true ) } catch { throw SettingsError.directoryCreationFailed(path: expandedPath, error: error) } } // Update settings settings.cacheDirectory = path try saveSettings(settings) } func getCacheDirectory() -> String { return getSettings().cacheDirectory } func setCachingEnabled(_ enabled: Bool) throws { var settings = getSettings() settings.cachingEnabled = enabled try saveSettings(settings) } func isCachingEnabled() -> Bool { return getSettings().cachingEnabled } // MARK: - Private Helpers private func ensureConfigDirectoryExists() { try? fileManager.createDirectory(atPath: configDir, withIntermediateDirectories: true) } private func readSettingsFromFile() -> LumeSettings? { // Read from YAML file if fileExists(at: configFilePath) { do { let yamlString = try String( contentsOf: URL(fileURLWithPath: configFilePath), encoding: .utf8) return parseYamlSettings(yamlString) } catch { Logger.error( "Failed to read settings from YAML file", metadata: ["error": error.localizedDescription] ) } } return nil } private func parseYamlSettings(_ yamlString: String) -> LumeSettings? { // This is a very basic YAML parser for our specific config format // A real implementation would use a proper YAML library var defaultLocationName = "default" var cacheDirectory = "~/.lume/cache" var cachingEnabled = true // default to true for backward compatibility var vmLocations: [VMLocation] = [] var inLocationsSection = false var currentLocation: (name: String?, path: String?) = (nil, nil) let lines = yamlString.split(separator: "\n") for (_, line) in lines.enumerated() { let trimmedLine = line.trimmingCharacters(in: .whitespaces) // Skip comments and empty lines if trimmedLine.hasPrefix("#") || trimmedLine.isEmpty { continue } // Check for section marker if trimmedLine == "vmLocations:" { inLocationsSection = true continue } // In the locations section, handle line indentation more carefully if inLocationsSection { if trimmedLine.hasPrefix("-") || trimmedLine.contains("- name:") { // Process the previous location before starting a new one if let name = currentLocation.name, let path = currentLocation.path { vmLocations.append(VMLocation(name: name, path: path)) } currentLocation = (nil, nil) } // Process the key-value pairs within a location if let colonIndex = trimmedLine.firstIndex(of: ":") { let key = trimmedLine[..<colonIndex].trimmingCharacters(in: .whitespaces) let rawValue = trimmedLine[trimmedLine.index(after: colonIndex)...] .trimmingCharacters(in: .whitespaces) let value = extractValueFromYaml(rawValue) if key.hasSuffix("name") { currentLocation.name = value } else if key.hasSuffix("path") { currentLocation.path = value } } } else { // Process top-level keys outside the locations section if let colonIndex = trimmedLine.firstIndex(of: ":") { let key = trimmedLine[..<colonIndex].trimmingCharacters(in: .whitespaces) let rawValue = trimmedLine[trimmedLine.index(after: colonIndex)...] .trimmingCharacters(in: .whitespaces) let value = extractValueFromYaml(rawValue) if key == "defaultLocationName" { defaultLocationName = value } else if key == "cacheDirectory" { cacheDirectory = value } else if key == "cachingEnabled" { cachingEnabled = value.lowercased() == "true" } } } } // Don't forget to add the last location if let name = currentLocation.name, let path = currentLocation.path { vmLocations.append(VMLocation(name: name, path: path)) } // Ensure at least one location exists if vmLocations.isEmpty { vmLocations.append(VMLocation(name: "default", path: "~/.lume")) } return LumeSettings( vmLocations: vmLocations, defaultLocationName: defaultLocationName, cacheDirectory: cacheDirectory, cachingEnabled: cachingEnabled ) } // Helper method to extract a value from YAML, handling quotes private func extractValueFromYaml(_ rawValue: String) -> String { if rawValue.hasPrefix("\"") && rawValue.hasSuffix("\"") && rawValue.count >= 2 { // Remove the surrounding quotes let startIndex = rawValue.index(after: rawValue.startIndex) let endIndex = rawValue.index(before: rawValue.endIndex) return String(rawValue[startIndex..<endIndex]) } return rawValue } // Helper method to output debug information about the current settings func debugSettings() -> String { let settings = getSettings() var output = "Current Settings:\n" output += "- Default VM storage: \(settings.defaultLocationName)\n" output += "- Cache directory: \(settings.cacheDirectory)\n" output += "- VM Locations (\(settings.vmLocations.count)):\n" for (i, location) in settings.vmLocations.enumerated() { let isDefault = location.name == settings.defaultLocationName let defaultMark = isDefault ? " (default)" : "" output += " \(i+1). \(location.name): \(location.path)\(defaultMark)\n" } // Also add raw file content if fileExists(at: configFilePath) { if let content = try? String(contentsOf: URL(fileURLWithPath: configFilePath)) { output += "\nRaw YAML file content:\n" output += content } } return output } private func fileExists(at path: String) -> Bool { fileManager.fileExists(atPath: path) } } // MARK: - Errors enum SettingsError: Error, LocalizedError { case notADirectory(path: String) case directoryNotWritable(path: String) case directoryCreationFailed(path: String, error: Error) var errorDescription: String? { switch self { case .notADirectory(let path): return "Path is not a directory: \(path)" case .directoryNotWritable(let path): return "Directory is not writable: \(path)" case .directoryCreationFailed(let path, let error): return "Failed to create directory at \(path): \(error.localizedDescription)" } } } ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/adapters/mlxvlm_adapter.py: -------------------------------------------------------------------------------- ```python import asyncio import functools import warnings import io import base64 import math import re from concurrent.futures import ThreadPoolExecutor from typing import Iterator, AsyncIterator, Dict, List, Any, Optional, Tuple, cast from PIL import Image from litellm.types.utils import GenericStreamingChunk, ModelResponse from litellm.llms.custom_llm import CustomLLM from litellm import completion, acompletion # Try to import MLX dependencies try: import mlx.core as mx from mlx_vlm import load, generate from mlx_vlm.prompt_utils import apply_chat_template from mlx_vlm.utils import load_config from transformers.tokenization_utils import PreTrainedTokenizer MLX_AVAILABLE = True except ImportError: MLX_AVAILABLE = False # Constants for smart_resize IMAGE_FACTOR = 28 MIN_PIXELS = 100 * 28 * 28 MAX_PIXELS = 16384 * 28 * 28 MAX_RATIO = 200 def round_by_factor(number: float, factor: int) -> int: """Returns the closest integer to 'number' that is divisible by 'factor'.""" return round(number / factor) * factor def ceil_by_factor(number: float, factor: int) -> int: """Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'.""" return math.ceil(number / factor) * factor def floor_by_factor(number: float, factor: int) -> int: """Returns the largest integer less than or equal to 'number' that is divisible by 'factor'.""" return math.floor(number / factor) * factor def smart_resize( height: int, width: int, factor: int = IMAGE_FACTOR, min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS ) -> tuple[int, int]: """ Rescales the image so that the following conditions are met: 1. Both dimensions (height and width) are divisible by 'factor'. 2. The total number of pixels is within the range ['min_pixels', 'max_pixels']. 3. The aspect ratio of the image is maintained as closely as possible. """ if max(height, width) / min(height, width) > MAX_RATIO: raise ValueError( f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}" ) h_bar = max(factor, round_by_factor(height, factor)) w_bar = max(factor, round_by_factor(width, factor)) if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = floor_by_factor(height / beta, factor) w_bar = floor_by_factor(width / beta, factor) elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = ceil_by_factor(height * beta, factor) w_bar = ceil_by_factor(width * beta, factor) return h_bar, w_bar class MLXVLMAdapter(CustomLLM): """MLX VLM Adapter for running vision-language models locally using MLX.""" def __init__(self, **kwargs): """Initialize the adapter. Args: **kwargs: Additional arguments """ super().__init__() self.models = {} # Cache for loaded models self.processors = {} # Cache for loaded processors self.configs = {} # Cache for loaded configs self._executor = ThreadPoolExecutor(max_workers=1) # Single thread pool def _load_model_and_processor(self, model_name: str): """Load model and processor if not already cached. Args: model_name: Name of the model to load Returns: Tuple of (model, processor, config) """ if not MLX_AVAILABLE: raise ImportError("MLX VLM dependencies not available. Please install mlx-vlm.") if model_name not in self.models: # Load model and processor model_obj, processor = load( model_name, processor_kwargs={"min_pixels": MIN_PIXELS, "max_pixels": MAX_PIXELS} ) config = load_config(model_name) # Cache them self.models[model_name] = model_obj self.processors[model_name] = processor self.configs[model_name] = config return self.models[model_name], self.processors[model_name], self.configs[model_name] def _process_coordinates(self, text: str, original_size: Tuple[int, int], model_size: Tuple[int, int]) -> str: """Process coordinates in box tokens based on image resizing using smart_resize approach. Args: text: Text containing box tokens original_size: Original image size (width, height) model_size: Model processed image size (width, height) Returns: Text with processed coordinates """ # Find all box tokens box_pattern = r"<\|box_start\|>\((\d+),\s*(\d+)\)<\|box_end\|>" def process_coords(match): model_x, model_y = int(match.group(1)), int(match.group(2)) # Scale coordinates from model space to original image space # Both original_size and model_size are in (width, height) format new_x = int(model_x * original_size[0] / model_size[0]) # Width new_y = int(model_y * original_size[1] / model_size[1]) # Height return f"<|box_start|>({new_x},{new_y})<|box_end|>" return re.sub(box_pattern, process_coords, text) def _convert_messages(self, messages: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Image.Image], Dict[int, Tuple[int, int]], Dict[int, Tuple[int, int]]]: """Convert OpenAI format messages to MLX VLM format and extract images. Args: messages: Messages in OpenAI format Returns: Tuple of (processed_messages, images, original_sizes, model_sizes) """ processed_messages = [] images = [] original_sizes = {} # Track original sizes of images for coordinate mapping model_sizes = {} # Track model processed sizes image_index = 0 for message in messages: processed_message = { "role": message["role"], "content": [] } content = message.get("content", []) if isinstance(content, str): # Simple text content processed_message["content"] = content elif isinstance(content, list): # Multi-modal content processed_content = [] for item in content: if item.get("type") == "text": processed_content.append({ "type": "text", "text": item.get("text", "") }) elif item.get("type") == "image_url": image_url = item.get("image_url", {}).get("url", "") pil_image = None if image_url.startswith("data:image/"): # Extract base64 data base64_data = image_url.split(',')[1] # Convert base64 to PIL Image image_data = base64.b64decode(base64_data) pil_image = Image.open(io.BytesIO(image_data)) else: # Handle file path or URL pil_image = Image.open(image_url) # Store original image size for coordinate mapping original_size = pil_image.size original_sizes[image_index] = original_size # Use smart_resize to determine model size # Note: smart_resize expects (height, width) but PIL gives (width, height) height, width = original_size[1], original_size[0] new_height, new_width = smart_resize(height, width) # Store model size in (width, height) format for consistent coordinate processing model_sizes[image_index] = (new_width, new_height) # Resize the image using the calculated dimensions from smart_resize resized_image = pil_image.resize((new_width, new_height)) images.append(resized_image) # Add image placeholder to content processed_content.append({ "type": "image" }) image_index += 1 processed_message["content"] = processed_content processed_messages.append(processed_message) return processed_messages, images, original_sizes, model_sizes def _generate(self, **kwargs) -> str: """Generate response using the local MLX VLM model. Args: **kwargs: Keyword arguments containing messages and model info Returns: Generated text response """ messages = kwargs.get('messages', []) model_name = kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit') max_tokens = kwargs.get('max_tokens', 128) # Warn about ignored kwargs ignored_kwargs = set(kwargs.keys()) - {'messages', 'model', 'max_tokens'} if ignored_kwargs: warnings.warn(f"Ignoring unsupported kwargs: {ignored_kwargs}") # Load model and processor model, processor, config = self._load_model_and_processor(model_name) # Convert messages and extract images processed_messages, images, original_sizes, model_sizes = self._convert_messages(messages) # Process user text input with box coordinates after image processing # Swap original_size and model_size arguments for inverse transformation for msg_idx, msg in enumerate(processed_messages): if msg.get("role") == "user" and isinstance(msg.get("content"), str): content = msg.get("content", "") if "<|box_start|>" in content and original_sizes and model_sizes and 0 in original_sizes and 0 in model_sizes: orig_size = original_sizes[0] model_size = model_sizes[0] # Swap arguments to perform inverse transformation for user input processed_messages[msg_idx]["content"] = self._process_coordinates(content, model_size, orig_size) try: # Format prompt according to model requirements using the processor directly prompt = processor.apply_chat_template( processed_messages, tokenize=False, add_generation_prompt=True, return_tensors='pt' ) tokenizer = cast(PreTrainedTokenizer, processor) # Generate response text_content, usage = generate( model, tokenizer, str(prompt), images, # type: ignore verbose=False, max_tokens=max_tokens ) except Exception as e: raise RuntimeError(f"Error generating response: {str(e)}") from e # Process coordinates in the response back to original image space if original_sizes and model_sizes and 0 in original_sizes and 0 in model_sizes: # Get original image size and model size (using the first image) orig_size = original_sizes[0] model_size = model_sizes[0] # Check if output contains box tokens that need processing if "<|box_start|>" in text_content: # Process coordinates from model space back to original image space text_content = self._process_coordinates(text_content, orig_size, model_size) return text_content def completion(self, *args, **kwargs) -> ModelResponse: """Synchronous completion method. Returns: ModelResponse with generated text """ generated_text = self._generate(**kwargs) result = completion( model=f"mlx/{kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit')}", mock_response=generated_text, ) return cast(ModelResponse, result) async def acompletion(self, *args, **kwargs) -> ModelResponse: """Asynchronous completion method. Returns: ModelResponse with generated text """ # Run _generate in thread pool to avoid blocking loop = asyncio.get_event_loop() generated_text = await loop.run_in_executor( self._executor, functools.partial(self._generate, **kwargs) ) result = await acompletion( model=f"mlx/{kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit')}", mock_response=generated_text, ) return cast(ModelResponse, result) def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]: """Synchronous streaming method. Returns: Iterator of GenericStreamingChunk """ generated_text = self._generate(**kwargs) generic_streaming_chunk: GenericStreamingChunk = { "finish_reason": "stop", "index": 0, "is_finished": True, "text": generated_text, "tool_use": None, "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0}, } yield generic_streaming_chunk async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]: """Asynchronous streaming method. Returns: AsyncIterator of GenericStreamingChunk """ # Run _generate in thread pool to avoid blocking loop = asyncio.get_event_loop() generated_text = await loop.run_in_executor( self._executor, functools.partial(self._generate, **kwargs) ) generic_streaming_chunk: GenericStreamingChunk = { "finish_reason": "stop", "index": 0, "is_finished": True, "text": generated_text, "tool_use": None, "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0}, } yield generic_streaming_chunk ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/loops/omniparser.py: -------------------------------------------------------------------------------- ```python """ OpenAI computer-use-preview agent loop implementation using liteLLM Paper: https://arxiv.org/abs/2408.00203 Code: https://github.com/microsoft/OmniParser """ import asyncio import json from typing import Dict, List, Any, AsyncGenerator, Union, Optional, Tuple import litellm import inspect import base64 from ..decorators import register_agent from ..types import Messages, AgentResponse, Tools, AgentCapability from ..loops.base import AsyncAgentConfig SOM_TOOL_SCHEMA = { "type": "function", "name": "computer", "description": "Control a computer by taking screenshots and interacting with UI elements. This tool shows screenshots with numbered elements overlaid on them. Each UI element has been assigned a unique ID number that you can see in the image. Use the element's ID number to interact with any element instead of pixel coordinates.", "parameters": { "type": "object", "properties": { "action": { "type": "string", "enum": [ "screenshot", "click", "double_click", "drag", "type", "keypress", "scroll", "move", "wait", "get_current_url", "get_dimensions", "get_environment" ], "description": "The action to perform" }, "element_id": { "type": "integer", "description": "The ID of the element to interact with (required for click, double_click, move, scroll actions, and as start/end for drag)" }, "start_element_id": { "type": "integer", "description": "The ID of the element to start dragging from (required for drag action)" }, "end_element_id": { "type": "integer", "description": "The ID of the element to drag to (required for drag action)" }, "text": { "type": "string", "description": "The text to type (required for type action)" }, "keys": { "type": "string", "description": "Key combination to press (required for keypress action). Single key for individual key press, multiple keys for combinations (e.g., 'ctrl+c')" }, "button": { "type": "string", "description": "The mouse button to use for click action (left, right, wheel, back, forward) Default: left", }, "scroll_x": { "type": "integer", "description": "Horizontal scroll amount for scroll action (positive for right, negative for left)", }, "scroll_y": { "type": "integer", "description": "Vertical scroll amount for scroll action (positive for down, negative for up)", }, }, "required": [ "action" ] } } OMNIPARSER_AVAILABLE = False try: from som import OmniParser OMNIPARSER_AVAILABLE = True except ImportError: pass OMNIPARSER_SINGLETON = None def get_parser(): global OMNIPARSER_SINGLETON if OMNIPARSER_SINGLETON is None: OMNIPARSER_SINGLETON = OmniParser() return OMNIPARSER_SINGLETON def get_last_computer_call_output(messages: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """Get the last computer_call_output message from a messages list. Args: messages: List of messages to search through Returns: The last computer_call_output message dict, or None if not found """ for message in reversed(messages): if isinstance(message, dict) and message.get("type") == "computer_call_output": return message return None def _prepare_tools_for_omniparser(tool_schemas: List[Dict[str, Any]]) -> Tuple[Tools, dict]: """Prepare tools for OpenAI API format""" omniparser_tools = [] id2xy = dict() for schema in tool_schemas: if schema["type"] == "computer": omniparser_tools.append(SOM_TOOL_SCHEMA) if "id2xy" in schema: id2xy = schema["id2xy"] else: schema["id2xy"] = id2xy elif schema["type"] == "function": # Function tools use OpenAI-compatible schema directly (liteLLM expects this format) # Schema should be: {type, name, description, parameters} omniparser_tools.append({ "type": "function", **schema["function"] }) return omniparser_tools, id2xy async def replace_function_with_computer_call(item: Dict[str, Any], id2xy: Dict[int, Tuple[float, float]]): item_type = item.get("type") def _get_xy(element_id: Optional[int]) -> Union[Tuple[float, float], Tuple[None, None]]: if element_id is None: return (None, None) return id2xy.get(element_id, (None, None)) if item_type == "function_call": fn_name = item.get("name") fn_args = json.loads(item.get("arguments", "{}")) item_id = item.get("id") call_id = item.get("call_id") if fn_name == "computer": action = fn_args.get("action") element_id = fn_args.get("element_id") start_element_id = fn_args.get("start_element_id") end_element_id = fn_args.get("end_element_id") text = fn_args.get("text") keys = fn_args.get("keys") button = fn_args.get("button") scroll_x = fn_args.get("scroll_x") scroll_y = fn_args.get("scroll_y") x, y = _get_xy(element_id) start_x, start_y = _get_xy(start_element_id) end_x, end_y = _get_xy(end_element_id) action_args = { "type": action, "x": x, "y": y, "start_x": start_x, "start_y": start_y, "end_x": end_x, "end_y": end_y, "text": text, "keys": keys, "button": button, "scroll_x": scroll_x, "scroll_y": scroll_y } # Remove None values to keep the JSON clean action_args = {k: v for k, v in action_args.items() if v is not None} return [{ "type": "computer_call", "action": action_args, "id": item_id, "call_id": call_id, "status": "completed" }] return [item] async def replace_computer_call_with_function(item: Dict[str, Any], xy2id: Dict[Tuple[float, float], int]): """ Convert computer_call back to function_call format. Also handles computer_call_output -> function_call_output conversion. Args: item: The item to convert xy2id: Mapping from (x, y) coordinates to element IDs """ item_type = item.get("type") def _get_element_id(x: Optional[float], y: Optional[float]) -> Optional[int]: """Get element ID from coordinates, return None if coordinates are None""" if x is None or y is None: return None return xy2id.get((x, y)) if item_type == "computer_call": action_data = item.get("action", {}) # Extract coordinates and convert back to element IDs element_id = _get_element_id(action_data.get("x"), action_data.get("y")) start_element_id = _get_element_id(action_data.get("start_x"), action_data.get("start_y")) end_element_id = _get_element_id(action_data.get("end_x"), action_data.get("end_y")) # Build function arguments fn_args = { "action": action_data.get("type"), "element_id": element_id, "start_element_id": start_element_id, "end_element_id": end_element_id, "text": action_data.get("text"), "keys": action_data.get("keys"), "button": action_data.get("button"), "scroll_x": action_data.get("scroll_x"), "scroll_y": action_data.get("scroll_y") } # Remove None values to keep the JSON clean fn_args = {k: v for k, v in fn_args.items() if v is not None} return [{ "type": "function_call", "name": "computer", "arguments": json.dumps(fn_args), "id": item.get("id"), "call_id": item.get("call_id"), "status": "completed", # Fall back to string representation "content": f"Used tool: {action_data.get("type")}({json.dumps(fn_args)})" }] elif item_type == "computer_call_output": # Simple conversion: computer_call_output -> function_call_output return [{ "type": "function_call_output", "call_id": item.get("call_id"), "content": [item.get("output")], "id": item.get("id"), "status": "completed" }] return [item] @register_agent(models=r"omniparser\+.*|omni\+.*", priority=2) class OmniparserConfig(AsyncAgentConfig): """Omniparser agent configuration implementing AsyncAgentConfig protocol.""" async def predict_step( self, messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict[str, Any]]] = None, max_retries: Optional[int] = None, stream: bool = False, computer_handler=None, use_prompt_caching: Optional[bool] = False, _on_api_start=None, _on_api_end=None, _on_usage=None, _on_screenshot=None, **kwargs ) -> Dict[str, Any]: """ OpenAI computer-use-preview agent loop using liteLLM responses. Supports OpenAI's computer use preview models. """ if not OMNIPARSER_AVAILABLE: raise ValueError("omniparser loop requires som to be installed. Install it with `pip install cua-som`.") tools = tools or [] llm_model = model.split('+')[-1] # Prepare tools for OpenAI API openai_tools, id2xy = _prepare_tools_for_omniparser(tools) # Find last computer_call_output last_computer_call_output = get_last_computer_call_output(messages) # type: ignore if last_computer_call_output: image_url = last_computer_call_output.get("output", {}).get("image_url", "") image_data = image_url.split(",")[-1] if image_data: parser = get_parser() result = parser.parse(image_data) if _on_screenshot: await _on_screenshot(result.annotated_image_base64, "annotated_image") for element in result.elements: id2xy[element.id] = ((element.bbox.x1 + element.bbox.x2) / 2, (element.bbox.y1 + element.bbox.y2) / 2) # handle computer calls -> function calls new_messages = [] for message in messages: if not isinstance(message, dict): message = message.__dict__ new_messages += await replace_computer_call_with_function(message, id2xy) # type: ignore messages = new_messages # Prepare API call kwargs api_kwargs = { "model": llm_model, "input": messages, "tools": openai_tools if openai_tools else None, "stream": stream, "truncation": "auto", "num_retries": max_retries, **kwargs } # Call API start hook if _on_api_start: await _on_api_start(api_kwargs) print(str(api_kwargs)[:1000]) # Use liteLLM responses response = await litellm.aresponses(**api_kwargs) # Call API end hook if _on_api_end: await _on_api_end(api_kwargs, response) # Extract usage information usage = { **response.usage.model_dump(), # type: ignore "response_cost": response._hidden_params.get("response_cost", 0.0), # type: ignore } if _on_usage: await _on_usage(usage) # handle som function calls -> xy computer calls new_output = [] for i in range(len(response.output)): # type: ignore new_output += await replace_function_with_computer_call(response.output[i].model_dump(), id2xy) # type: ignore return { "output": new_output, "usage": usage } async def predict_click( self, model: str, image_b64: str, instruction: str, **kwargs ) -> Optional[Tuple[float, float]]: """ Predict click coordinates using OmniParser and LLM. Uses OmniParser to annotate the image with element IDs, then uses LLM to identify the correct element ID based on the instruction. """ if not OMNIPARSER_AVAILABLE: return None # Parse the image with OmniParser to get annotated image and elements parser = get_parser() result = parser.parse(image_b64) # Extract the LLM model from composed model string llm_model = model.split('+')[-1] # Create system prompt for element ID prediction SYSTEM_PROMPT = f''' You are an expert UI element locator. Given a GUI image annotated with numerical IDs over each interactable element, along with a user's element description, provide the ID of the specified element. The image shows UI elements with numbered overlays. Each number corresponds to a clickable/interactable element. Output only the element ID as a single integer. '''.strip() # Prepare messages for LLM messages = [ { "role": "system", "content": SYSTEM_PROMPT }, { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{result.annotated_image_base64}" } }, { "type": "text", "text": f"Find the element: {instruction}" } ] } ] # Call LLM to predict element ID response = await litellm.acompletion( model=llm_model, messages=messages, max_tokens=10, temperature=0.1 ) # Extract element ID from response response_text = response.choices[0].message.content.strip() # type: ignore # Try to parse the element ID try: element_id = int(response_text) # Find the element with this ID and return its center coordinates for element in result.elements: if element.id == element_id: center_x = (element.bbox.x1 + element.bbox.x2) / 2 center_y = (element.bbox.y1 + element.bbox.y2) / 2 return (center_x, center_y) except ValueError: # If we can't parse the ID, return None pass return None def get_capabilities(self) -> List[AgentCapability]: """Return the capabilities supported by this agent.""" return ["step"] ``` -------------------------------------------------------------------------------- /libs/python/computer/computer/interface/base.py: -------------------------------------------------------------------------------- ```python """Base interface for computer control.""" from abc import ABC, abstractmethod from typing import Optional, Dict, Any, Tuple, List from ..logger import Logger, LogLevel from .models import MouseButton, CommandResult class BaseComputerInterface(ABC): """Base class for computer control interfaces.""" def __init__(self, ip_address: str, username: str = "lume", password: str = "lume", api_key: Optional[str] = None, vm_name: Optional[str] = None): """Initialize interface. Args: ip_address: IP address of the computer to control username: Username for authentication password: Password for authentication api_key: Optional API key for cloud authentication vm_name: Optional VM name for cloud authentication """ self.ip_address = ip_address self.username = username self.password = password self.api_key = api_key self.vm_name = vm_name self.logger = Logger("cua.interface", LogLevel.NORMAL) # Optional default delay time between commands (in seconds) self.delay: float = 0.0 @abstractmethod async def wait_for_ready(self, timeout: int = 60) -> None: """Wait for interface to be ready. Args: timeout: Maximum time to wait in seconds Raises: TimeoutError: If interface is not ready within timeout """ pass @abstractmethod def close(self) -> None: """Close the interface connection.""" pass def force_close(self) -> None: """Force close the interface connection. By default, this just calls close(), but subclasses can override to provide more forceful cleanup. """ self.close() # Mouse Actions @abstractmethod async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: "MouseButton" = "left", delay: Optional[float] = None) -> None: """Press and hold a mouse button. Args: x: X coordinate to press at. If None, uses current cursor position. y: Y coordinate to press at. If None, uses current cursor position. button: Mouse button to press ('left', 'middle', 'right'). delay: Optional delay in seconds after the action """ pass @abstractmethod async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: "MouseButton" = "left", delay: Optional[float] = None) -> None: """Release a mouse button. Args: x: X coordinate to release at. If None, uses current cursor position. y: Y coordinate to release at. If None, uses current cursor position. button: Mouse button to release ('left', 'middle', 'right'). delay: Optional delay in seconds after the action """ pass @abstractmethod async def left_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None: """Perform a left mouse button click. Args: x: X coordinate to click at. If None, uses current cursor position. y: Y coordinate to click at. If None, uses current cursor position. delay: Optional delay in seconds after the action """ pass @abstractmethod async def right_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None: """Perform a right mouse button click. Args: x: X coordinate to click at. If None, uses current cursor position. y: Y coordinate to click at. If None, uses current cursor position. delay: Optional delay in seconds after the action """ pass @abstractmethod async def double_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None: """Perform a double left mouse button click. Args: x: X coordinate to double-click at. If None, uses current cursor position. y: Y coordinate to double-click at. If None, uses current cursor position. delay: Optional delay in seconds after the action """ pass @abstractmethod async def move_cursor(self, x: int, y: int, delay: Optional[float] = None) -> None: """Move the cursor to the specified screen coordinates. Args: x: X coordinate to move cursor to. y: Y coordinate to move cursor to. delay: Optional delay in seconds after the action """ pass @abstractmethod async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5, delay: Optional[float] = None) -> None: """Drag from current position to specified coordinates. Args: x: The x coordinate to drag to y: The y coordinate to drag to button: The mouse button to use ('left', 'middle', 'right') duration: How long the drag should take in seconds delay: Optional delay in seconds after the action """ pass @abstractmethod async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5, delay: Optional[float] = None) -> None: """Drag the cursor along a path of coordinates. Args: path: List of (x, y) coordinate tuples defining the drag path button: The mouse button to use ('left', 'middle', 'right') duration: Total time in seconds that the drag operation should take delay: Optional delay in seconds after the action """ pass # Keyboard Actions @abstractmethod async def key_down(self, key: str, delay: Optional[float] = None) -> None: """Press and hold a key. Args: key: The key to press and hold (e.g., 'a', 'shift', 'ctrl'). delay: Optional delay in seconds after the action. """ pass @abstractmethod async def key_up(self, key: str, delay: Optional[float] = None) -> None: """Release a previously pressed key. Args: key: The key to release (e.g., 'a', 'shift', 'ctrl'). delay: Optional delay in seconds after the action. """ pass @abstractmethod async def type_text(self, text: str, delay: Optional[float] = None) -> None: """Type the specified text string. Args: text: The text string to type. delay: Optional delay in seconds after the action. """ pass @abstractmethod async def press_key(self, key: str, delay: Optional[float] = None) -> None: """Press and release a single key. Args: key: The key to press (e.g., 'a', 'enter', 'escape'). delay: Optional delay in seconds after the action. """ pass @abstractmethod async def hotkey(self, *keys: str, delay: Optional[float] = None) -> None: """Press multiple keys simultaneously (keyboard shortcut). Args: *keys: Variable number of keys to press together (e.g., 'ctrl', 'c'). delay: Optional delay in seconds after the action. """ pass # Scrolling Actions @abstractmethod async def scroll(self, x: int, y: int, delay: Optional[float] = None) -> None: """Scroll the mouse wheel by specified amounts. Args: x: Horizontal scroll amount (positive = right, negative = left). y: Vertical scroll amount (positive = up, negative = down). delay: Optional delay in seconds after the action. """ pass @abstractmethod async def scroll_down(self, clicks: int = 1, delay: Optional[float] = None) -> None: """Scroll down by the specified number of clicks. Args: clicks: Number of scroll clicks to perform downward. delay: Optional delay in seconds after the action. """ pass @abstractmethod async def scroll_up(self, clicks: int = 1, delay: Optional[float] = None) -> None: """Scroll up by the specified number of clicks. Args: clicks: Number of scroll clicks to perform upward. delay: Optional delay in seconds after the action. """ pass # Screen Actions @abstractmethod async def screenshot(self) -> bytes: """Take a screenshot. Returns: Raw bytes of the screenshot image """ pass @abstractmethod async def get_screen_size(self) -> Dict[str, int]: """Get the screen dimensions. Returns: Dict with 'width' and 'height' keys """ pass @abstractmethod async def get_cursor_position(self) -> Dict[str, int]: """Get the current cursor position on screen. Returns: Dict with 'x' and 'y' keys containing cursor coordinates. """ pass # Clipboard Actions @abstractmethod async def copy_to_clipboard(self) -> str: """Get the current clipboard content. Returns: The text content currently stored in the clipboard. """ pass @abstractmethod async def set_clipboard(self, text: str) -> None: """Set the clipboard content to the specified text. Args: text: The text to store in the clipboard. """ pass # File System Actions @abstractmethod async def file_exists(self, path: str) -> bool: """Check if a file exists at the specified path. Args: path: The file path to check. Returns: True if the file exists, False otherwise. """ pass @abstractmethod async def directory_exists(self, path: str) -> bool: """Check if a directory exists at the specified path. Args: path: The directory path to check. Returns: True if the directory exists, False otherwise. """ pass @abstractmethod async def list_dir(self, path: str) -> List[str]: """List the contents of a directory. Args: path: The directory path to list. Returns: List of file and directory names in the specified directory. """ pass @abstractmethod async def read_text(self, path: str) -> str: """Read the text contents of a file. Args: path: The file path to read from. Returns: The text content of the file. """ pass @abstractmethod async def write_text(self, path: str, content: str) -> None: """Write text content to a file. Args: path: The file path to write to. content: The text content to write. """ pass @abstractmethod async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> bytes: """Read file binary contents with optional seeking support. Args: path: Path to the file offset: Byte offset to start reading from (default: 0) length: Number of bytes to read (default: None for entire file) """ pass @abstractmethod async def write_bytes(self, path: str, content: bytes) -> None: """Write binary content to a file. Args: path: The file path to write to. content: The binary content to write. """ pass @abstractmethod async def delete_file(self, path: str) -> None: """Delete a file at the specified path. Args: path: The file path to delete. """ pass @abstractmethod async def create_dir(self, path: str) -> None: """Create a directory at the specified path. Args: path: The directory path to create. """ pass @abstractmethod async def delete_dir(self, path: str) -> None: """Delete a directory at the specified path. Args: path: The directory path to delete. """ pass @abstractmethod async def get_file_size(self, path: str) -> int: """Get the size of a file in bytes. Args: path: The file path to get the size of. Returns: The size of the file in bytes. """ pass @abstractmethod async def run_command(self, command: str) -> CommandResult: """Run shell command and return structured result. Executes a shell command using subprocess.run with shell=True and check=False. The command is run in the target environment and captures both stdout and stderr. Args: command (str): The shell command to execute Returns: CommandResult: A structured result containing: - stdout (str): Standard output from the command - stderr (str): Standard error from the command - returncode (int): Exit code from the command (0 indicates success) Raises: RuntimeError: If the command execution fails at the system level Example: result = await interface.run_command("ls -la") if result.returncode == 0: print(f"Output: {result.stdout}") else: print(f"Error: {result.stderr}, Exit code: {result.returncode}") """ pass # Accessibility Actions @abstractmethod async def get_accessibility_tree(self) -> Dict: """Get the accessibility tree of the current screen. Returns: Dict containing the hierarchical accessibility information of screen elements. """ pass @abstractmethod async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]: """Convert screenshot coordinates to screen coordinates. Args: x: X coordinate in screenshot space y: Y coordinate in screenshot space Returns: tuple[float, float]: (x, y) coordinates in screen space """ pass @abstractmethod async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]: """Convert screen coordinates to screenshot coordinates. Args: x: X coordinate in screen space y: Y coordinate in screen space Returns: tuple[float, float]: (x, y) coordinates in screenshot space """ pass ``` -------------------------------------------------------------------------------- /libs/lumier/src/lib/vm.sh: -------------------------------------------------------------------------------- ```bash #!/usr/bin/env bash # Initialize global flags export PULL_IN_PROGRESS=0 start_vm() { # Determine storage path for VM STORAGE_PATH="$HOST_STORAGE_PATH" if [ -z "$STORAGE_PATH" ]; then STORAGE_PATH="storage_${VM_NAME}" fi # Check if VM exists and its status using JSON format - quietly VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}") # Check if VM not found error if [[ $VM_INFO == *"Virtual machine not found"* ]]; then IMAGE_NAME="${VERSION##*/}" # Parse registry and organization from VERSION REGISTRY=$(echo $VERSION | cut -d'/' -f1) ORGANIZATION=$(echo $VERSION | cut -d'/' -f2) echo "Pulling VM image $IMAGE_NAME..." lume_pull "$IMAGE_NAME" "$VM_NAME" "$STORAGE_PATH" "$REGISTRY" "$ORGANIZATION" else # Parse the JSON status - check if it contains "status" : "running" if [[ $VM_INFO == *'"status" : "running"'* ]]; then lume_stop "$VM_NAME" "$STORAGE_PATH" fi fi # Format memory size for display purposes MEMORY_DISPLAY="$RAM_SIZE" if [[ ! "$RAM_SIZE" == *"GB"* && ! "$RAM_SIZE" == *"MB"* ]]; then MEMORY_DISPLAY="${RAM_SIZE}MB" fi # Set VM parameters using the wrapper function if [[ "$LUMIER_DEBUG" == "1" ]]; then echo "Updating VM settings: cpu=$CPU_CORES memory=$MEMORY_DISPLAY display=$DISPLAY" fi lume_set "$VM_NAME" "$STORAGE_PATH" "$CPU_CORES" "$RAM_SIZE" "$DISPLAY" # Fetch VM configuration - quietly (don't display to console) CONFIG_JSON=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}") # Setup shared directory args if necessary SHARED_DIR_ARGS="" if [ -d "/shared" ]; then if [ -n "$HOST_SHARED_PATH" ]; then SHARED_DIR_ARGS="--shared-dir=$HOST_SHARED_PATH" else echo "Warning: /shared volume exists but HOST_SHARED_PATH is not set. Cannot mount volume." fi fi # Run VM with VNC and shared directory using curl lume_run $SHARED_DIR_ARGS --storage "$STORAGE_PATH" "$VM_NAME" & # lume run "$VM_NAME" --storage "$STORAGE_PATH" --no-display # sleep 10000000 # Wait for VM to be running and VNC URL to be available vm_ip="" vnc_url="" max_attempts=30 attempt=0 while [ $attempt -lt $max_attempts ]; do # Get VM info as JSON using the API function - pass debug flag VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}") # Extract status, IP address, and VNC URL using the helper function vm_status=$(extract_json_field "status" "$VM_INFO") vm_ip=$(extract_json_field "ipAddress" "$VM_INFO") vnc_url=$(extract_json_field "vncUrl" "$VM_INFO") # Check if VM status is 'running' and we have IP and VNC URL if [ "$vm_status" = "running" ] && [ -n "$vm_ip" ] && [ -n "$vnc_url" ]; then break fi sleep 2 attempt=$((attempt + 1)) done if [ -z "$vm_ip" ] || [ -z "$vnc_url" ]; then echo "Timed out waiting for VM to start or VNC URL to become available." lume_stop "$VM_NAME" "$STORAGE_PATH" > /dev/null 2>&1 # lume stop "$VM_NAME" --storage "$STORAGE_PATH" > /dev/null 2>&1 exit 1 fi # Parse VNC URL to extract password and port VNC_PASSWORD=$(echo "$vnc_url" | sed -n 's/.*:\(.*\)@.*/\1/p') VNC_PORT=$(echo "$vnc_url" | sed -n 's/.*:\([0-9]\+\)$/\1/p') # Wait for SSH to become available wait_for_ssh "$vm_ip" "$HOST_USER" "$HOST_PASSWORD" 5 20 # Export VNC variables for entry.sh to use export VNC_PORT export VNC_PASSWORD # Execute on-logon.sh if present on_logon_script="/run/lifecycle/on-logon.sh" # Only show detailed logs in debug mode if [ "${LUMIER_DEBUG:-0}" == "1" ]; then echo "Running on-logon.sh hook script on VM..." fi # Check if script exists if [ ! -f "$on_logon_script" ]; then echo "Warning: on-logon.sh hook script not found at $on_logon_script" else # Execute the remote script execute_remote_script "$vm_ip" "$HOST_USER" "$HOST_PASSWORD" "$on_logon_script" "$VNC_PASSWORD" "$HOST_SHARED_PATH" fi } # Get VM information using curl lume_get() { local vm_name="$1" local storage="$2" local format="${3:-json}" local debug="${4:-false}" local api_host="${LUME_API_HOST:-host.docker.internal}" local api_port="${LUME_API_PORT:-7777}" # URL encode the storage path for the query parameter # Replace special characters with their URL encoded equivalents local encoded_storage=$(echo "$storage" | sed 's/\//%2F/g' | sed 's/ /%20/g' | sed 's/:/%3A/g') # Construct API URL with encoded storage parameter local api_url="http://${api_host}:${api_port}/lume/vms/${vm_name}?storage=${encoded_storage}" # Construct the full curl command local curl_cmd="curl --connect-timeout 6000 --max-time 5000 -s '$api_url'" # Print debug info if [[ "$debug" == "true" || "$LUMIER_DEBUG" == "1" ]]; then echo "[DEBUG] Calling API: $api_url" echo "[DEBUG] Full curl command: $curl_cmd" fi # Log curl commands only when in debug mode if [[ "$debug" == "true" || "$LUMIER_DEBUG" == "1" ]]; then echo "[$(date -u '+%Y-%m-%dT%H:%M:%SZ')] DEBUG: Executing curl request: $api_url" >&2 fi # Make the API call local response=$(curl --connect-timeout 6000 \ --max-time 5000 \ -s \ "$api_url") # Print the response if debugging is enabled if [[ "$debug" == "true" || "${LUMIER_DEBUG:-0}" == "1" ]]; then echo "[DEBUG] API Response:" echo "$response" | jq '.' 2>/dev/null || echo "$response" fi # Output the response so callers can capture it echo "$response" } # Set VM properties using curl lume_set() { local vm_name="$1" local storage="$2" local cpu="${3:-4}" local memory="${4:-8192}" local display="${5:-1024x768}" local api_host="${LUME_API_HOST:-host.docker.internal}" local api_port="${LUME_API_PORT:-7777}" # Handle memory format for the API if [[ "$memory" == *"GB"* ]]; then # Already in GB format, keep as is : # No-op elif [[ "$memory" =~ ^[0-9]+$ ]]; then # If memory is a simple number, assume MB and convert to GB memory="$(awk "BEGIN { printf \"%.1f\", $memory/1024 }")GB" fi # Only show memory formatting debug in debug mode if [[ "$LUMIER_DEBUG" == "1" ]]; then echo "[DEBUG] Formatted memory value: $memory" fi # Store response to conditionally show based on debug mode local response=$(curl --connect-timeout 6000 \ --max-time 5000 \ -s \ -X PATCH \ -H "Content-Type: application/json" \ -d "{ \"cpu\": $cpu, \"memory\": \"$memory\", \"display\": \"$display\", \"storage\": \"$storage\" }" \ "http://${api_host}:${api_port}/lume/vms/${vm_name}") # Only show response in debug mode if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then echo "$response" fi } stop_vm() { local in_cleanup=${1:-false} # Optional first argument to indicate if called from cleanup trap echo "Stopping VM '$VM_NAME'..." STORAGE_PATH="$HOST_STORAGE_PATH" # Only show storage path in debug mode if [[ "$LUMIER_DEBUG" == "1" ]]; then echo "STORAGE_PATH: $STORAGE_PATH" fi VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}") vm_status=$(extract_json_field "status" "$VM_INFO") if [ "$vm_status" == "running" ]; then lume_stop "$VM_NAME" "$STORAGE_PATH" elif [ "$vm_status" == "stopped" ]; then echo "VM '$VM_NAME' is already stopped." elif [ "$in_cleanup" = true ]; then # If we are in the cleanup trap and status is unknown or VM not found, # still attempt a stop just in case. echo "VM status is unknown ('$vm_status') or VM not found during cleanup. Attempting stop anyway." lume_stop "$VM_NAME" "$STORAGE_PATH" sleep 5 echo "VM '$VM_NAME' stop command issued as a precaution." else echo "VM status is unknown ('$vm_status') or VM not found. Not attempting stop." fi } is_vm_running() { # Check VM status using the API function local vm_info vm_info=$(lume_get "$VM_NAME" "$HOST_STORAGE_PATH") if [[ $vm_info == *'"status" : "running"'* ]]; then return 0 # Running else return 1 # Not running or doesn't exist fi # lume ls | grep -q "$VM_NAME" # Old CLI check } # Stop VM with storage location specified using curl lume_stop() { local vm_name="$1" local storage="$2" local api_host="${LUME_API_HOST:-host.docker.internal}" local api_port="${LUME_API_PORT:-7777}" # Only log in debug mode if [[ "$LUMIER_DEBUG" == "1" ]]; then echo "Stopping VM $vm_name..." fi # Execute command and capture response local response if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then # Show output in debug mode response=$(curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ -H "Content-Type: application/json" \ -d '{"storage":"'$storage'"}' \ "http://${api_host}:${api_port}/lume/vms/${vm_name}/stop") echo "$response" else # Run silently in normal mode response=$(curl --connect-timeout 6000 \ --max-time 5000 \ -s \ -X POST \ -H "Content-Type: application/json" \ -d '{"storage":"'$storage'"}' \ "http://${api_host}:${api_port}/lume/vms/${vm_name}/stop") fi } # Pull a VM image using curl lume_pull() { local image="$1" # Image name with tag local vm_name="$2" # Name for the new VM local storage="$3" # Storage location local registry="${4:-ghcr.io}" # Registry, default is ghcr.io local organization="${5:-trycua}" # Organization, default is trycua local api_host="${LUME_API_HOST:-host.docker.internal}" local api_port="${LUME_API_PORT:-7777}" # Mark that pull is in progress for interrupt handling export PULL_IN_PROGRESS=1 # Only log full details in debug mode if [[ "$LUMIER_DEBUG" == "1" ]]; then echo "Pulling image $image from $registry/$organization..." else echo "Pulling image $image..." fi # Inform users how to check pull progress echo "You can check the pull progress using: lume logs -f" # Pull image via API and capture response local response if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then # Show full response in debug mode - no timeout limits response=$(curl \ -X POST \ -H "Content-Type: application/json" \ -d "{ \"image\": \"$image\", \"name\": \"$vm_name\", \"registry\": \"$registry\", \"organization\": \"$organization\", \"storage\": \"$storage\" }" \ "http://${api_host}:${api_port}/lume/pull") echo "$response" else # Run silently in normal mode - no timeout limits response=$(curl \ -s \ -X POST \ -H "Content-Type: application/json" \ -d "{ \"image\": \"$image\", \"name\": \"$vm_name\", \"registry\": \"$registry\", \"organization\": \"$organization\", \"storage\": \"$storage\" }" \ "http://${api_host}:${api_port}/lume/pull") fi # Unset pull in progress flag export PULL_IN_PROGRESS=0 } # Run VM with VNC client started and shared directory using curl lume_run() { # Parse args local shared_dir="" local storage="" local vm_name="lume_vm" local no_display=true while [[ $# -gt 0 ]]; do case $1 in --shared-dir=*) shared_dir="${1#*=}" shift ;; --storage) storage="$2" shift 2 ;; --no-display) no_display=true shift ;; *) # Assume last arg is VM name if not an option vm_name="$1" shift ;; esac done local api_host="${LUME_API_HOST:-host.docker.internal}" local api_port="${LUME_API_PORT:-7777}" # Only log in debug mode if [[ "$LUMIER_DEBUG" == "1" ]]; then echo "Running VM $vm_name..." fi # Build the JSON body dynamically based on what's provided local json_body="{\"noDisplay\": true" # Only include shared directories if shared_dir is provided if [[ -n "$shared_dir" ]]; then json_body+=", \"sharedDirectories\": [{\"hostPath\": \"$shared_dir\", \"readOnly\": false}]" fi # Only include storage if it's provided if [[ -n "$storage" ]]; then json_body+=", \"storage\": \"$storage\"" fi # Add recovery mode (always false) json_body+=", \"recoveryMode\": false}" # Execute the command and store the response local response if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then # Show response in debug mode response=$(curl --connect-timeout 6000 \ --max-time 5000 \ -X POST \ -H 'Content-Type: application/json' \ -d "$json_body" \ http://${api_host}:${api_port}/lume/vms/$vm_name/run) echo "$response" else # Run silently in normal mode response=$(curl --connect-timeout 6000 \ --max-time 5000 \ -s \ -X POST \ -H 'Content-Type: application/json' \ -d "$json_body" \ http://${api_host}:${api_port}/lume/vms/$vm_name/run) fi } # Delete a VM using curl lume_delete() { local vm_name="$1" local storage="$2" local api_host="${LUME_API_HOST:-host.docker.internal}" local api_port="${LUME_API_PORT:-7777}" # URL encode the storage path for the query parameter # Replace special characters with their URL encoded equivalents local encoded_storage=$(echo "$storage" | sed 's/\//%2F/g' | sed 's/ /%20/g' | sed 's/:/%3A/g') # Construct API URL with encoded storage parameter local api_url="http://${api_host}:${api_port}/lume/vms/${vm_name}?storage=${encoded_storage}" # Only log in debug mode if [[ "$LUMIER_DEBUG" == "1" ]]; then echo "Deleting VM $vm_name from storage $storage..." fi # Execute command and capture response local response if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then # Show output in debug mode response=$(curl --connect-timeout 6000 \ --max-time 5000 \ -X DELETE \ "$api_url") echo "$response" else # Run silently in normal mode response=$(curl --connect-timeout 6000 \ --max-time 5000 \ -s \ -X DELETE \ "$api_url") fi } ``` -------------------------------------------------------------------------------- /libs/python/agent/benchmarks/utils.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """ Shared utilities for ScreenSpot-Pro benchmarking and interactive testing. """ import dotenv dotenv.load_dotenv() import asyncio import base64 import os import sys import subprocess as sp import statistics from datetime import datetime from io import BytesIO from typing import List, Union, Tuple, Optional from PIL import Image, ImageDraw from tqdm import tqdm import gc import torch # Add parent directory to path for imports sys.path.append(os.path.join(os.path.dirname(__file__), '..')) from agent.agent import ComputerAgent from models.base import ModelProtocol def get_gpu_memory() -> List[int]: """ Get GPU memory usage using nvidia-smi. Returns: List of free memory values in MB for each GPU """ try: command = "nvidia-smi --query-gpu=memory.free --format=csv" memory_free_info = sp.check_output(command.split()).decode('ascii').split('\n')[:-1][1:] memory_free_values = [int(x.split()[0]) for i, x in enumerate(memory_free_info)] return memory_free_values except (sp.CalledProcessError, FileNotFoundError, IndexError): # Fallback to torch if nvidia-smi is not available if torch.cuda.is_available(): device = torch.cuda.current_device() total = torch.cuda.get_device_properties(device).total_memory / 1024 / 1024 reserved = torch.cuda.memory_reserved(device) / 1024 / 1024 return [int(total - reserved)] return [0] def get_vram_usage() -> dict: """ Get current VRAM usage statistics. Returns: Dictionary with VRAM usage info (in MB) """ if torch.cuda.is_available(): device = torch.cuda.current_device() allocated = torch.cuda.memory_allocated(device) / 1024 / 1024 # Convert to MB reserved = torch.cuda.memory_reserved(device) / 1024 / 1024 # Convert to MB total = torch.cuda.get_device_properties(device).total_memory / 1024 / 1024 return { 'allocated_mb': allocated, 'reserved_mb': reserved, 'total_mb': total, 'free_mb': total - reserved } else: return { 'allocated_mb': 0.0, 'reserved_mb': 0.0, 'total_mb': 0.0, 'free_mb': 0.0 } def get_available_models() -> List[Union[str, ModelProtocol]]: """ Get list of available models for testing. Returns: List of model strings and model classes """ local_provider = "huggingface-local/" # Options: huggingface-local/ or mlx/ # from models.gta1 import GTA1Model models = [ # === ComputerAgent model strings === "openai/computer-use-preview", "anthropic/claude-opus-4-20250514", # f"{local_provider}HelloKKMe/GTA1-7B", # f"{local_provider}HelloKKMe/GTA1-32B", "openai/computer-use-preview+openai/gpt-4o-mini", "anthropic/claude-opus-4-20250514+openai/gpt-4o-mini", # === Reference model classes === # GTA1Model("HelloKKMe/GTA1-7B"), # GTA1Model("HelloKKMe/GTA1-32B"), ] return models def is_click_in_bbox(click_coords: Optional[Tuple[int, int]], bbox: List[int]) -> bool: """ Check if click coordinates are within the bounding box. Args: click_coords: (x, y) coordinates or None bbox: [x1, y1, x2, y2] bounding box Returns: True if click is within bbox, False otherwise """ if click_coords is None: return False x, y = click_coords x1, y1, x2, y2 = bbox return x1 <= x <= x2 and y1 <= y <= y2 def image_to_base64(image: Image.Image) -> str: """ Convert PIL Image to base64 string. Args: image: PIL Image Returns: Base64 encoded image string """ buffered = BytesIO() image.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode() class ModelWrapper: """ Wrapper to provide unified interface for both ComputerAgent and custom models. """ def __init__(self, model: Union[str, ModelProtocol]): self.model = model self.is_computer_agent = isinstance(model, str) self.agent: Optional[ComputerAgent] = None self.vram_usage_history: List[float] = [] # Track VRAM usage over time if self.is_computer_agent: self.model_name = str(model) else: self.model_name = f"{model.__class__.__name__}('{getattr(model, 'model_name', 'unknown')}')" async def load_model(self) -> None: """Load the model.""" if self.is_computer_agent: self.agent = ComputerAgent(model=str(self.model)) else: await self.model.load_model() # type: ignore # Record initial VRAM usage after loading vram_info = get_vram_usage() self.vram_usage_history.append(vram_info['allocated_mb']) async def unload_model(self) -> None: """Unload the model.""" if not self.is_computer_agent: await self.model.unload_model() # type: ignore else: del self.agent self.agent = None gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() # Record VRAM usage after unloading vram_info = get_vram_usage() self.vram_usage_history.append(vram_info['allocated_mb']) def get_vram_stats(self) -> dict: """Get VRAM usage statistics for this model.""" if not self.vram_usage_history: return {'max_mb': 0.0, 'avg_mb': 0.0} return { 'max_mb': max(self.vram_usage_history), 'avg_mb': sum(self.vram_usage_history) / len(self.vram_usage_history) } async def predict_click(self, image: Image.Image, instruction: str) -> Optional[Tuple[int, int]]: """Predict click coordinates.""" # Record VRAM usage before prediction vram_info = get_vram_usage() self.vram_usage_history.append(vram_info['allocated_mb']) if self.is_computer_agent: if self.agent is None: await self.load_model() if self.agent is not None: image_b64 = image_to_base64(image) result = await self.agent.predict_click(instruction=instruction, image_b64=image_b64) # Record VRAM usage after prediction vram_info = get_vram_usage() self.vram_usage_history.append(vram_info['allocated_mb']) return result return None else: result = await self.model.predict_click(image, instruction) # type: ignore # Record VRAM usage after prediction vram_info = get_vram_usage() self.vram_usage_history.append(vram_info['allocated_mb']) return result def save_results_to_markdown(all_results: List[dict],output_file: str = "screenspot_pro_results.md", title: str = "ScreenSpot-Pro Benchmark Results") -> None: """ Save evaluation results to a markdown table. Args: all_results: List of evaluation results for each model output_file: Output markdown file path """ with open(output_file, 'w', encoding='utf-8') as f: f.write(f"# {title}\n\n") f.write(f"**Evaluation Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") # Summary table f.write("## Summary\n\n") f.write("| Model | Total Samples | Correct | Errors | Accuracy | Error Rate | Avg Time (s) | Median Time (s) | Time Range (s) | VRAM Max (GB) | VRAM Avg (GB) |\n") f.write("|-------|---------------|---------|--------|----------|------------|--------------|-----------------|----------------|---------------|---------------|\n") for result in all_results: model_name = result['model_name'] total = result['total_samples'] correct = result['correct_predictions'] errors = result['failed_predictions'] accuracy = result['accuracy'] * 100 error_rate = result['failure_rate'] * 100 avg_time = result.get('avg_prediction_time', 0.0) median_time = result.get('median_prediction_time', 0.0) min_time = result.get('min_prediction_time', 0.0) max_time = result.get('max_prediction_time', 0.0) time_range = f"{min_time:.2f} - {max_time:.2f}" vram_max = result.get('vram_max_mb', 0.0) / 1024 vram_avg = result.get('vram_avg_mb', 0.0) / 1024 f.write(f"| {model_name} | {total} | {correct} | {errors} | {accuracy:.2f}% | {error_rate:.2f}% | {avg_time:.2f} | {median_time:.2f} | {time_range} | {vram_max:.1f} | {vram_avg:.1f} |\n") # Detailed results for each model for result in all_results: f.write(f"\n## {result['model_name']} - Detailed Results\n\n") f.write("| Sample Index | Instruction | BBox | Predicted | Correct | Error | Time (s) |\n") f.write("|-----------|-------------|------|-----------|---------|-------|----------|\n") for sample_result in result['results'][:10]: # Show first 10 samples sample_idx = sample_result['sample_idx'] instruction = sample_result['instruction'][:50] + "..." if len(sample_result['instruction']) > 50 else sample_result['instruction'] bbox = str(sample_result['bbox']) predicted = str(sample_result['predicted_coords']) if sample_result['predicted_coords'] else "None" correct = "PASS" if sample_result['is_correct'] else "FAIL" error = "YES" if sample_result['failed'] else "NO" pred_time = sample_result.get('prediction_time', 0.0) f.write(f"| {sample_idx} | {instruction} | {bbox} | {predicted} | {correct} | {error} | {pred_time:.2f} |\n") if len(result['results']) > 10: f.write(f"\n*Showing first 10 of {len(result['results'])} samples*\n") print(f"\nResults saved to: {output_file}") def save_visualizations(all_results: List[dict], samples, output_dir: str = "output") -> None: """ Save visualizations of predicted coordinates vs bboxes to an output folder. Args: all_results: List of evaluation results for each model samples: List of sample dicts with image, bbox, instruction keys output_dir: Output directory path """ os.makedirs(output_dir, exist_ok=True) for result in all_results: model_name = result['model_name'].replace('/', '_').replace('\\', '_') model_dir = os.path.join(output_dir, model_name) os.makedirs(model_dir, exist_ok=True) print(f"Saving visualizations for {result['model_name']}...") # Save first 10 samples for visualization for i, sample_result in enumerate(tqdm(result['results'][:10], desc=f"Saving {model_name} visualizations")): # Get sample data using index sample_idx = sample_result['sample_idx'] if sample_idx < len(samples): sample = samples[sample_idx] image = sample['image'].copy() # Make a copy to avoid modifying original else: print(f"Warning: Could not find sample at index {sample_idx}") continue bbox = sample_result['bbox'] predicted_coords = sample_result['predicted_coords'] is_correct = sample_result['is_correct'] # Draw on image draw = ImageDraw.Draw(image) # Draw bounding box (ground truth) in green x1, y1, x2, y2 = bbox draw.rectangle([x1, y1, x2, y2], outline="green", width=3) draw.text((x1, y1-20), "Ground Truth", fill="green") # Draw predicted click in red or blue if predicted_coords is not None: px, py = predicted_coords color = "blue" if is_correct else "red" # Draw crosshair crosshair_size = 15 draw.line([(px-crosshair_size, py), (px+crosshair_size, py)], fill=color, width=3) draw.line([(px, py-crosshair_size), (px, py+crosshair_size)], fill=color, width=3) draw.text((px+10, py-20), f"Predicted ({px},{py})", fill=color) # Add status text status = "CORRECT" if is_correct else "INCORRECT" status_color = "blue" if is_correct else "red" draw.text((10, 10), f"Status: {status}", fill=status_color) draw.text((10, 30), f"Instruction: {sample_result['instruction'][:50]}...", fill="black") # Save image filename = f"sample_{i+1:02d}_idx{sample_idx}_{status.lower()}.png" filepath = os.path.join(model_dir, filename) image.save(filepath) print(f"Visualizations saved to: {model_dir}") def save_prediction_visualization(image: Image.Image, instruction: str, predictions: List[dict], output_file: str = "interactive_prediction.png") -> None: """ Save visualization of multiple model predictions on a single image. Args: image: PIL Image to visualize instruction: Instruction text predictions: List of prediction dicts with keys: model_name, coords, error output_file: Output file path """ # Create a copy of the image vis_image = image.copy() draw = ImageDraw.Draw(vis_image) # Colors for different models colors = ["red", "blue", "orange", "purple", "brown", "pink", "gray", "olive"] # Draw predictions for i, pred in enumerate(predictions): color = colors[i % len(colors)] model_name = pred['model_name'] coords = pred.get('coords') error = pred.get('error') if coords is not None: px, py = coords # Draw crosshair crosshair_size = 20 draw.line([(px-crosshair_size, py), (px+crosshair_size, py)], fill=color, width=4) draw.line([(px, py-crosshair_size), (px, py+crosshair_size)], fill=color, width=4) # Draw model name draw.text((px+15, py+15), f"{model_name}: ({px},{py})", fill=color) else: # Draw error text draw.text((10, 50 + i*20), f"{model_name}: ERROR - {error}", fill=color) # Add instruction at the top draw.text((10, 10), f"Instruction: {instruction}", fill="black") # Save image vis_image.save(output_file) print(f"Prediction visualization saved to: {output_file}") def take_screenshot() -> Image.Image: """ Take a screenshot of the current screen. Returns: PIL Image of the screenshot """ try: import pyautogui screenshot = pyautogui.screenshot() return screenshot except ImportError: print("pyautogui not installed. Please install it with: pip install pyautogui") raise except Exception as e: print(f"Error taking screenshot: {e}") raise ``` -------------------------------------------------------------------------------- /libs/python/agent/agent/callbacks/trajectory_saver.py: -------------------------------------------------------------------------------- ```python """ Trajectory saving callback handler for ComputerAgent. """ import os import json import uuid from datetime import datetime import base64 from pathlib import Path from typing import List, Dict, Any, Optional, Union, override from PIL import Image, ImageDraw import io from copy import deepcopy from .base import AsyncCallbackHandler def sanitize_image_urls(data: Any) -> Any: """ Recursively search for 'image_url' keys and set their values to '[omitted]'. Args: data: Any data structure (dict, list, or primitive type) Returns: A deep copy of the data with all 'image_url' values replaced with '[omitted]' """ if isinstance(data, dict): # Create a copy of the dictionary sanitized = {} for key, value in data.items(): if key == "image_url": sanitized[key] = "[omitted]" else: # Recursively sanitize the value sanitized[key] = sanitize_image_urls(value) return sanitized elif isinstance(data, list): # Recursively sanitize each item in the list return [sanitize_image_urls(item) for item in data] else: # For primitive types (str, int, bool, None, etc.), return as-is return data def extract_computer_call_outputs(items: List[Dict[str, Any]], screenshot_dir: Optional[Path]) -> List[Dict[str, Any]]: """ Save any base64-encoded screenshots from computer_call_output entries to files and replace their image_url with the saved file path when a call_id is present. Only operates if screenshot_dir is provided and exists; otherwise returns items unchanged. Args: items: List of message/result dicts potentially containing computer_call_output entries screenshot_dir: Directory to write screenshots into Returns: A new list with updated image_url fields when applicable. """ if not items: return items if not screenshot_dir or not screenshot_dir.exists(): return items updated: List[Dict[str, Any]] = [] for item in items: # work on a shallow copy; deep copy nested 'output' if we modify it msg = dict(item) try: if msg.get("type") == "computer_call_output": call_id = msg.get("call_id") output = msg.get("output", {}) image_url = output.get("image_url") if call_id and isinstance(image_url, str) and image_url.startswith("data:"): # derive extension from MIME type e.g. data:image/png;base64, try: ext = image_url.split(";", 1)[0].split("/")[-1] if not ext: ext = "png" except Exception: ext = "png" out_path = screenshot_dir / f"{call_id}.{ext}" # write file if it doesn't exist if not out_path.exists(): try: b64_payload = image_url.split(",", 1)[1] img_bytes = base64.b64decode(b64_payload) out_path.parent.mkdir(parents=True, exist_ok=True) with open(out_path, "wb") as f: f.write(img_bytes) except Exception: # if anything fails, skip modifying this message pass # update image_url to file path new_output = dict(output) new_output["image_url"] = str(out_path) msg["output"] = new_output except Exception: # do not block on malformed entries; keep original pass updated.append(msg) return updated class TrajectorySaverCallback(AsyncCallbackHandler): """ Callback handler that saves agent trajectories to disk. Saves each run as a separate trajectory with unique ID, and each turn within the trajectory gets its own folder with screenshots and responses. """ def __init__(self, trajectory_dir: str, reset_on_run: bool = True, screenshot_dir: Optional[str] = None): """ Initialize trajectory saver. Args: trajectory_dir: Base directory to save trajectories reset_on_run: If True, reset trajectory_id/turn/artifact on each run. If False, continue using existing trajectory_id if set. """ self.trajectory_dir = Path(trajectory_dir) self.trajectory_id: Optional[str] = None self.current_turn: int = 0 self.current_artifact: int = 0 self.model: Optional[str] = None self.total_usage: Dict[str, Any] = {} self.reset_on_run = reset_on_run # Optional directory to store extracted screenshots from metadata/new_items self.screenshot_dir: Optional[Path] = Path(screenshot_dir) if screenshot_dir else None # Ensure trajectory directory exists self.trajectory_dir.mkdir(parents=True, exist_ok=True) def _get_turn_dir(self) -> Path: """Get the directory for the current turn.""" if not self.trajectory_id: raise ValueError("Trajectory not initialized - call _on_run_start first") # format: trajectory_id/turn_000 turn_dir = self.trajectory_dir / self.trajectory_id / f"turn_{self.current_turn:03d}" turn_dir.mkdir(parents=True, exist_ok=True) return turn_dir def _save_artifact(self, name: str, artifact: Union[str, bytes, Dict[str, Any]]) -> None: """Save an artifact to the current turn directory.""" turn_dir = self._get_turn_dir() if isinstance(artifact, bytes): # format: turn_000/0000_name.png artifact_filename = f"{self.current_artifact:04d}_{name}" artifact_path = turn_dir / f"{artifact_filename}.png" with open(artifact_path, "wb") as f: f.write(artifact) else: # format: turn_000/0000_name.json artifact_filename = f"{self.current_artifact:04d}_{name}" artifact_path = turn_dir / f"{artifact_filename}.json" # add created_at if isinstance(artifact, dict): artifact = artifact.copy() artifact["created_at"] = str(uuid.uuid1().time) with open(artifact_path, "w") as f: json.dump(sanitize_image_urls(artifact), f, indent=2) self.current_artifact += 1 def _update_usage(self, usage: Dict[str, Any]) -> None: """Update total usage statistics.""" def add_dicts(target: Dict[str, Any], source: Dict[str, Any]) -> None: for key, value in source.items(): if isinstance(value, dict): if key not in target: target[key] = {} add_dicts(target[key], value) else: if key not in target: target[key] = 0 target[key] += value add_dicts(self.total_usage, usage) @override async def on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None: """Initialize trajectory tracking for a new run.""" model = kwargs.get("model", "unknown") # Only reset trajectory state if reset_on_run is True or no trajectory exists if self.reset_on_run or not self.trajectory_id: model_name_short = model.split("+")[-1].split("/")[-1].lower()[:16] if "+" in model: model_name_short = model.split("+")[0].lower()[:4] + "_" + model_name_short # strip non-alphanumeric characters from model_name_short model_name_short = ''.join(c for c in model_name_short if c.isalnum() or c == '_') # id format: yyyy-mm-dd_model_hhmmss_uuid[:4] now = datetime.now() self.trajectory_id = f"{now.strftime('%Y-%m-%d')}_{model_name_short}_{now.strftime('%H%M%S')}_{str(uuid.uuid4())[:4]}" self.current_turn = 0 self.current_artifact = 0 self.model = model self.total_usage = {} # Create trajectory directory trajectory_path = self.trajectory_dir / self.trajectory_id trajectory_path.mkdir(parents=True, exist_ok=True) # Save trajectory metadata (optionally extract screenshots to screenshot_dir) kwargs_to_save = kwargs.copy() try: if "messages" in kwargs_to_save: kwargs_to_save["messages"] = extract_computer_call_outputs( kwargs_to_save["messages"], self.screenshot_dir ) except Exception: # If extraction fails, fall back to original messages pass metadata = { "trajectory_id": self.trajectory_id, "created_at": str(uuid.uuid1().time), "status": "running", "kwargs": kwargs_to_save, } with open(trajectory_path / "metadata.json", "w") as f: json.dump(metadata, f, indent=2) else: # Continue with existing trajectory - just update model if needed self.model = model @override async def on_run_end(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> None: """Finalize run tracking by updating metadata with completion status, usage, and new items.""" if not self.trajectory_id: return # Update metadata with completion status, total usage, and new items trajectory_path = self.trajectory_dir / self.trajectory_id metadata_path = trajectory_path / "metadata.json" # Read existing metadata if metadata_path.exists(): with open(metadata_path, "r") as f: metadata = json.load(f) else: metadata = {} # Update metadata with completion info # Optionally extract screenshots from new_items before persisting new_items_to_save = new_items try: new_items_to_save = extract_computer_call_outputs(new_items, self.screenshot_dir) except Exception: pass metadata.update({ "status": "completed", "completed_at": str(uuid.uuid1().time), "total_usage": self.total_usage, "new_items": new_items_to_save, "total_turns": self.current_turn }) # Save updated metadata with open(metadata_path, "w") as f: json.dump(metadata, f, indent=2) @override async def on_api_start(self, kwargs: Dict[str, Any]) -> None: if not self.trajectory_id: return self._save_artifact("api_start", { "kwargs": kwargs }) @override async def on_api_end(self, kwargs: Dict[str, Any], result: Any) -> None: """Save API call result.""" if not self.trajectory_id: return self._save_artifact("api_result", { "kwargs": kwargs, "result": result }) @override async def on_screenshot(self, screenshot: Union[str, bytes], name: str = "screenshot") -> None: """Save a screenshot.""" if isinstance(screenshot, str): screenshot = base64.b64decode(screenshot) self._save_artifact(name, screenshot) @override async def on_usage(self, usage: Dict[str, Any]) -> None: """Called when usage information is received.""" self._update_usage(usage) @override async def on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None: """Save responses to the current turn directory and update usage statistics.""" if not self.trajectory_id: return # Save responses turn_dir = self._get_turn_dir() response_data = { "timestamp": str(uuid.uuid1().time), "model": self.model, "kwargs": kwargs, "response": responses } self._save_artifact("agent_response", response_data) # Increment turn counter self.current_turn += 1 def _draw_crosshair_on_image(self, image_bytes: bytes, x: int, y: int) -> bytes: """ Draw a red dot and crosshair at the specified coordinates on the image. Args: image_bytes: The original image as bytes x: X coordinate for the crosshair y: Y coordinate for the crosshair Returns: Modified image as bytes with red dot and crosshair """ # Open the image image = Image.open(io.BytesIO(image_bytes)) draw = ImageDraw.Draw(image) # Draw crosshair lines (red, 2px thick) crosshair_size = 20 line_width = 2 color = "red" # Horizontal line draw.line([(x - crosshair_size, y), (x + crosshair_size, y)], fill=color, width=line_width) # Vertical line draw.line([(x, y - crosshair_size), (x, y + crosshair_size)], fill=color, width=line_width) # Draw center dot (filled circle) dot_radius = 3 draw.ellipse([(x - dot_radius, y - dot_radius), (x + dot_radius, y + dot_radius)], fill=color) # Convert back to bytes output = io.BytesIO() image.save(output, format='PNG') return output.getvalue() @override async def on_computer_call_end(self, item: Dict[str, Any], result: List[Dict[str, Any]]) -> None: """ Called when a computer call has completed. Saves screenshots and computer call output. """ if not self.trajectory_id: return self._save_artifact("computer_call_result", { "item": item, "result": result }) # Check if action has x/y coordinates and there's a screenshot in the result action = item.get("action", {}) if "x" in action and "y" in action: # Look for screenshot in the result for result_item in result: if (result_item.get("type") == "computer_call_output" and result_item.get("output", {}).get("type") == "input_image"): image_url = result_item["output"]["image_url"] # Extract base64 image data if image_url.startswith("data:image/"): # Format: data:image/png;base64,<base64_data> base64_data = image_url.split(",", 1)[1] else: # Assume it's just base64 data base64_data = image_url try: # Decode the image image_bytes = base64.b64decode(base64_data) # Draw crosshair at the action coordinates annotated_image = self._draw_crosshair_on_image( image_bytes, int(action["x"]), int(action["y"]) ) # Save as screenshot_action self._save_artifact("screenshot_action", annotated_image) except Exception as e: # If annotation fails, just log and continue print(f"Failed to annotate screenshot: {e}") break # Only process the first screenshot found # Increment turn counter self.current_turn += 1 ``` -------------------------------------------------------------------------------- /tests/test_files.py: -------------------------------------------------------------------------------- ```python """ File System Interface Tests Tests for the file system methods of the Computer interface (macOS). Required environment variables: - CUA_API_KEY: API key for Cua cloud provider - CUA_CONTAINER_NAME: Name of the container to use """ import os import asyncio import pytest from pathlib import Path import sys import traceback # Load environment variables from .env file project_root = Path(__file__).parent.parent env_file = project_root / ".env" print(f"Loading environment from: {env_file}") from dotenv import load_dotenv load_dotenv(env_file) # Add paths to sys.path if needed pythonpath = os.environ.get("PYTHONPATH", "") for path in pythonpath.split(":"): if path and path not in sys.path: sys.path.insert(0, path) # Insert at beginning to prioritize print(f"Added to sys.path: {path}") from computer import Computer, VMProviderType @pytest.fixture(scope="session") async def computer(): """Shared Computer instance for all test cases.""" # Create a remote Linux computer with Cua computer = Computer( os_type="linux", api_key=os.getenv("CUA_API_KEY"), name=str(os.getenv("CUA_CONTAINER_NAME")), provider_type=VMProviderType.CLOUD, ) # Create a local macOS computer with Cua # computer = Computer() # Connect to host computer # computer = Computer(use_host_computer_server=True) try: await computer.run() yield computer finally: await computer.disconnect() @pytest.mark.asyncio(loop_scope="session") async def test_file_exists(computer): tmp_path = "test_file_exists.txt" # Ensure file does not exist if await computer.interface.file_exists(tmp_path): await computer.interface.delete_file(tmp_path) exists = await computer.interface.file_exists(tmp_path) assert exists is False, f"File {tmp_path} should not exist" # Create file and check again await computer.interface.write_text(tmp_path, "hello") exists = await computer.interface.file_exists(tmp_path) assert exists is True, f"File {tmp_path} should exist" await computer.interface.delete_file(tmp_path) @pytest.mark.asyncio(loop_scope="session") async def test_directory_exists(computer): tmp_dir = "test_directory_exists" if await computer.interface.directory_exists(tmp_dir): # Remove all files in directory before removing directory files = await computer.interface.list_dir(tmp_dir) for fname in files: await computer.interface.delete_file(f"{tmp_dir}/{fname}") # Remove the directory itself await computer.interface.delete_dir(tmp_dir) exists = await computer.interface.directory_exists(tmp_dir) assert exists is False, f"Directory {tmp_dir} should not exist" await computer.interface.create_dir(tmp_dir) exists = await computer.interface.directory_exists(tmp_dir) assert exists is True, f"Directory {tmp_dir} should exist" # Cleanup: remove files and directory files = await computer.interface.list_dir(tmp_dir) for fname in files: await computer.interface.delete_file(f"{tmp_dir}/{fname}") await computer.interface.delete_dir(tmp_dir) @pytest.mark.asyncio(loop_scope="session") async def test_list_dir(computer): tmp_dir = "test_list_dir" if not await computer.interface.directory_exists(tmp_dir): await computer.interface.create_dir(tmp_dir) files = ["foo.txt", "bar.txt"] for fname in files: await computer.interface.write_text(f"{tmp_dir}/{fname}", "hi") result = await computer.interface.list_dir(tmp_dir) assert set(result) >= set(files), f"Directory {tmp_dir} should contain files {files}" for fname in files: await computer.interface.delete_file(f"{tmp_dir}/{fname}") await computer.interface.delete_dir(tmp_dir) @pytest.mark.asyncio(loop_scope="session") async def test_read_write_text(computer): tmp_path = "test_rw_text.txt" content = "sample text" await computer.interface.write_text(tmp_path, content) read = await computer.interface.read_text(tmp_path) assert read == content, "File content should match" await computer.interface.delete_file(tmp_path) @pytest.mark.asyncio(loop_scope="session") async def test_delete_file(computer): tmp_path = "test_delete_file.txt" await computer.interface.write_text(tmp_path, "bye") exists = await computer.interface.file_exists(tmp_path) assert exists is True, "File should exist" await computer.interface.delete_file(tmp_path) exists = await computer.interface.file_exists(tmp_path) assert exists is False, "File should not exist" @pytest.mark.asyncio(loop_scope="session") async def test_create_dir(computer): tmp_dir = "test_create_dir" if await computer.interface.directory_exists(tmp_dir): await computer.interface.delete_dir(tmp_dir) await computer.interface.create_dir(tmp_dir) exists = await computer.interface.directory_exists(tmp_dir) assert exists is True, "Directory should exist" await computer.interface.delete_dir(tmp_dir) @pytest.mark.asyncio(loop_scope="session") async def test_read_bytes_basic(computer): """Test basic read_bytes functionality.""" tmp_path = "test_read_bytes.bin" test_data = b"Hello, World! This is binary data \x00\x01\x02\x03" # Write binary data using write_text (assuming it handles bytes) await computer.interface.write_text(tmp_path, test_data.decode('latin-1')) # Read all bytes read_data = await computer.interface.read_bytes(tmp_path) assert read_data == test_data, "Binary data should match" await computer.interface.delete_file(tmp_path) @pytest.mark.asyncio(loop_scope="session") async def test_read_bytes_with_offset_and_length(computer): """Test read_bytes with offset and length parameters.""" tmp_path = "test_read_bytes_offset.bin" test_data = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" # Write test data await computer.interface.write_text(tmp_path, test_data.decode('latin-1')) # Test reading with offset only read_data = await computer.interface.read_bytes(tmp_path, offset=5) expected = test_data[5:] assert read_data == expected, f"Data from offset 5 should match. Got: {read_data}, Expected: {expected}" # Test reading with offset and length read_data = await computer.interface.read_bytes(tmp_path, offset=10, length=5) expected = test_data[10:15] assert read_data == expected, f"Data from offset 10, length 5 should match. Got: {read_data}, Expected: {expected}" # Test reading from beginning with length read_data = await computer.interface.read_bytes(tmp_path, offset=0, length=10) expected = test_data[:10] assert read_data == expected, f"Data from beginning, length 10 should match. Got: {read_data}, Expected: {expected}" await computer.interface.delete_file(tmp_path) @pytest.mark.asyncio(loop_scope="session") async def test_get_file_size(computer): """Test get_file_size functionality.""" tmp_path = "test_file_size.txt" test_content = "A" * 1000 # 1000 bytes await computer.interface.write_text(tmp_path, test_content) file_size = await computer.interface.get_file_size(tmp_path) assert file_size == 1000, f"File size should be 1000 bytes, got {file_size}" await computer.interface.delete_file(tmp_path) @pytest.mark.asyncio(loop_scope="session") async def test_read_large_file(computer): """Test reading a file larger than 10MB to verify chunked reading.""" tmp_path = "test_large_file.bin" # Create a file larger than 10MB (10 * 1024 * 1024 = 10,485,760 bytes) total_size = 12 * 1024 * 1024 # 12MB print(f"Creating large file of {total_size} bytes ({total_size / (1024*1024):.1f}MB)...") # Create large file content (this will test the chunked writing functionality) large_content = b"X" * total_size # Write the large file using write_bytes (will automatically use chunked writing) await computer.interface.write_bytes(tmp_path, large_content) # Verify file size file_size = await computer.interface.get_file_size(tmp_path) assert file_size == total_size, f"Large file size should be {total_size} bytes, got {file_size}" print(f"Large file created successfully: {file_size} bytes") # Test reading the entire large file (should use chunked reading) print("Reading large file...") read_data = await computer.interface.read_bytes(tmp_path) assert len(read_data) == total_size, f"Read data size should match file size. Got {len(read_data)}, expected {total_size}" # Verify content (should be all 'X' characters) expected_data = b"X" * total_size assert read_data == expected_data, "Large file content should be all 'X' characters" print("Large file read successfully!") # Test reading with offset and length on large file offset = 5 * 1024 * 1024 # 5MB offset length = 2 * 1024 * 1024 # 2MB length read_data = await computer.interface.read_bytes(tmp_path, offset=offset, length=length) assert len(read_data) == length, f"Partial read size should be {length}, got {len(read_data)}" assert read_data == b"X" * length, "Partial read content should be all 'X' characters" print("Large file partial read successful!") # Clean up await computer.interface.delete_file(tmp_path) print("Large file test completed successfully!") @pytest.mark.asyncio(loop_scope="session") async def test_read_write_text_with_encoding(computer): """Test reading and writing text files with different encodings.""" print("Testing text file operations with different encodings...") tmp_path = "test_encoding.txt" # Test UTF-8 encoding (default) utf8_content = "Hello, 世界! 🌍 Ñoño café" await computer.interface.write_text(tmp_path, utf8_content, encoding='utf-8') read_utf8 = await computer.interface.read_text(tmp_path, encoding='utf-8') assert read_utf8 == utf8_content, "UTF-8 content should match" # Test ASCII encoding ascii_content = "Hello, World! Simple ASCII text." await computer.interface.write_text(tmp_path, ascii_content, encoding='ascii') read_ascii = await computer.interface.read_text(tmp_path, encoding='ascii') assert read_ascii == ascii_content, "ASCII content should match" # Test Latin-1 encoding latin1_content = "Café, naïve, résumé" await computer.interface.write_text(tmp_path, latin1_content, encoding='latin-1') read_latin1 = await computer.interface.read_text(tmp_path, encoding='latin-1') assert read_latin1 == latin1_content, "Latin-1 content should match" # Clean up await computer.interface.delete_file(tmp_path) print("Text encoding test completed successfully!") @pytest.mark.asyncio(loop_scope="session") async def test_write_text_append_mode(computer): """Test appending text to files.""" print("Testing text file append mode...") tmp_path = "test_append.txt" # Write initial content initial_content = "First line\n" await computer.interface.write_text(tmp_path, initial_content) # Append more content append_content = "Second line\n" await computer.interface.write_text(tmp_path, append_content, append=True) # Read and verify final_content = await computer.interface.read_text(tmp_path) expected_content = initial_content + append_content assert final_content == expected_content, f"Expected '{expected_content}', got '{final_content}'" # Append one more line third_content = "Third line\n" await computer.interface.write_text(tmp_path, third_content, append=True) # Read and verify final result final_content = await computer.interface.read_text(tmp_path) expected_content = initial_content + append_content + third_content assert final_content == expected_content, f"Expected '{expected_content}', got '{final_content}'" # Clean up await computer.interface.delete_file(tmp_path) print("Text append test completed successfully!") @pytest.mark.asyncio(loop_scope="session") async def test_large_text_file(computer): """Test reading and writing large text files (>5MB) to verify chunked operations.""" print("Testing large text file operations...") tmp_path = "test_large_text.txt" # Create a large text content (approximately 6MB) # Each line is about 100 characters, so 60,000 lines ≈ 6MB line_template = "This is line {:06d} with some additional text to make it longer and reach about 100 chars.\n" large_content = "" num_lines = 60000 print(f"Generating large text content with {num_lines} lines...") for i in range(num_lines): large_content += line_template.format(i) content_size_mb = len(large_content.encode('utf-8')) / (1024 * 1024) print(f"Generated text content size: {content_size_mb:.2f} MB") # Write the large text file print("Writing large text file...") await computer.interface.write_text(tmp_path, large_content) # Read the entire file back print("Reading large text file...") read_content = await computer.interface.read_text(tmp_path) # Verify content matches assert read_content == large_content, "Large text file content should match exactly" # Test partial reading by reading as bytes and decoding specific portions print("Testing partial text reading...") # Read first 1000 characters worth of bytes first_1000_chars = large_content[:1000] first_1000_bytes = first_1000_chars.encode('utf-8') read_bytes = await computer.interface.read_bytes(tmp_path, offset=0, length=len(first_1000_bytes)) decoded_partial = read_bytes.decode('utf-8') assert decoded_partial == first_1000_chars, "Partial text reading should match" # Test appending to large file print("Testing append to large text file...") append_text = "\n--- APPENDED CONTENT ---\nThis content was appended to the large file.\n" await computer.interface.write_text(tmp_path, append_text, append=True) # Read and verify appended content final_content = await computer.interface.read_text(tmp_path) expected_final = large_content + append_text assert final_content == expected_final, "Appended large text file should match" # Clean up await computer.interface.delete_file(tmp_path) print("Large text file test completed successfully!") @pytest.mark.asyncio(loop_scope="session") async def test_text_file_edge_cases(computer): """Test edge cases for text file operations.""" print("Testing text file edge cases...") tmp_path = "test_edge_cases.txt" # Test empty file empty_content = "" await computer.interface.write_text(tmp_path, empty_content) read_empty = await computer.interface.read_text(tmp_path) assert read_empty == empty_content, "Empty file should return empty string" # Test file with only whitespace whitespace_content = " \n\t\r\n \n" await computer.interface.write_text(tmp_path, whitespace_content) read_whitespace = await computer.interface.read_text(tmp_path) assert read_whitespace == whitespace_content, "Whitespace content should be preserved" # Test file with special characters and newlines special_content = "Line 1\nLine 2\r\nLine 3\tTabbed\nSpecial: !@#$%^&*()\n" await computer.interface.write_text(tmp_path, special_content) read_special = await computer.interface.read_text(tmp_path) assert read_special == special_content, "Special characters should be preserved" # Test very long single line (no newlines) long_line = "A" * 10000 # 10KB single line await computer.interface.write_text(tmp_path, long_line) read_long_line = await computer.interface.read_text(tmp_path) assert read_long_line == long_line, "Long single line should be preserved" # Clean up await computer.interface.delete_file(tmp_path) print("Text file edge cases test completed successfully!") if __name__ == "__main__": # Run tests directly pytest.main([__file__, "-v"]) ``` -------------------------------------------------------------------------------- /docs/src/app/(home)/[[...slug]]/page.tsx: -------------------------------------------------------------------------------- ```typescript import { getApiVersions, source } from '@/lib/source'; import { getMDXComponents } from '@/mdx-components'; import { buttonVariants } from 'fumadocs-ui/components/ui/button'; import { Popover, PopoverContent, PopoverTrigger, } from 'fumadocs-ui/components/ui/popover'; import { createRelativeLink } from 'fumadocs-ui/mdx'; import { DocsBody, DocsDescription, DocsPage, DocsTitle, } from 'fumadocs-ui/page'; import { cn } from 'fumadocs-ui/utils/cn'; import { ChevronDown, CodeXml, ExternalLink } from 'lucide-react'; import type { Metadata } from 'next'; import Link from 'next/link'; import { notFound, redirect } from 'next/navigation'; export default async function Page(props: { params: Promise<{ slug?: string[] }>; }) { const params = await props.params; const slug = params.slug || []; const page = source.getPage(slug); if (!page) notFound(); //redirect('/docs'); // Detect if this is an API reference page: /api/[section] or /api/[section]/[version] let apiSection: string | null = null; let apiVersionSlug: string[] = []; if (slug[0] === 'api' && slug.length >= 2) { apiSection = slug[1]; if (slug.length > 2) { apiVersionSlug = slug.slice(2); } } let versionItems: { label: string; slug: string[] }[] = []; if (apiSection) { versionItems = await getApiVersions(apiSection); } const macos = page.data.macos; const windows = page.data.windows; const linux = page.data.linux; const pypi = page.data.pypi; const npm = page.data.npm; const github = page.data.github; const MDXContent = page.data.body; // Platform icons component const PlatformIcons = () => { const hasAnyPlatform = macos || windows || linux; if (!hasAnyPlatform && !pypi) return null; return ( <div className="flex flex-col gap-2"> {hasAnyPlatform && ( <div className="flex flex-row gap-2 items-left dark:text-neutral-400"> {windows && ( <svg xmlns="http://www.w3.org/2000/svg" fill="currentColor" className="h-5" viewBox="0 0 448 512"> <title>Windows</title> <path d="M0 93.7l183.6-25.3v177.4H0V93.7zm0 324.6l183.6 25.3V268.4H0v149.9zm203.8 28L448 480V268.4H203.8v177.9zm0-380.6v180.1H448V32L203.8 65.7z" /> </svg> )} {macos && ( <svg xmlns="http://www.w3.org/2000/svg" fill="currentColor" className="h-5" viewBox="0 0 384 512"> <title>macOS</title> <path d="M318.7 268.7c-.2-36.7 16.4-64.4 50-84.8-18.8-26.9-47.2-41.7-84.7-44.6-35.5-2.8-74.3 20.7-88.5 20.7-15 0-49.4-19.7-76.4-19.7C63.3 141.2 4 184.8 4 273.5q0 39.3 14.4 81.2c12.8 36.7 59 126.7 107.2 125.2 25.2-.6 43-17.9 75.8-17.9 31.8 0 48.3 17.9 76.4 17.9 48.6-.7 90.4-82.5 102.6-119.3-65.2-30.7-61.7-90-61.7-91.9zm-56.6-164.2c27.3-32.4 24.8-61.9 24-72.5-24.1 1.4-52 16.4-67.9 34.9-17.5 19.8-27.8 44.3-25.6 71.9 26.1 2 49.9-11.4 69.5-34.3z" /> </svg> )} {linux && ( <svg xmlns="http://www.w3.org/2000/svg" fill="currentColor" className="h-5" viewBox="0 0 448 512"> <title>Linux</title> <path d="M220.8 123.3c1 .5 1.8 1.7 3 1.7 1.1 0 2.8-.4 2.9-1.5 .2-1.4-1.9-2.3-3.2-2.9-1.7-.7-3.9-1-5.5-.1-.4 .2-.8 .7-.6 1.1 .3 1.3 2.3 1.1 3.4 1.7zm-21.9 1.7c1.2 0 2-1.2 3-1.7 1.1-.6 3.1-.4 3.5-1.6 .2-.4-.2-.9-.6-1.1-1.6-.9-3.8-.6-5.5 .1-1.3 .6-3.4 1.5-3.2 2.9 .1 1 1.8 1.5 2.8 1.4zM420 403.8c-3.6-4-5.3-11.6-7.2-19.7-1.8-8.1-3.9-16.8-10.5-22.4-1.3-1.1-2.6-2.1-4-2.9-1.3-.8-2.7-1.5-4.1-2 9.2-27.3 5.6-54.5-3.7-79.1-11.4-30.1-31.3-56.4-46.5-74.4-17.1-21.5-33.7-41.9-33.4-72C311.1 85.4 315.7 .1 234.8 0 132.4-.2 158 103.4 156.9 135.2c-1.7 23.4-6.4 41.8-22.5 64.7-18.9 22.5-45.5 58.8-58.1 96.7-6 17.9-8.8 36.1-6.2 53.3-6.5 5.8-11.4 14.7-16.6 20.2-4.2 4.3-10.3 5.9-17 8.3s-14 6-18.5 14.5c-2.1 3.9-2.8 8.1-2.8 12.4 0 3.9 .6 7.9 1.2 11.8 1.2 8.1 2.5 15.7 .8 20.8-5.2 14.4-5.9 24.4-2.2 31.7 3.8 7.3 11.4 10.5 20.1 12.3 17.3 3.6 40.8 2.7 59.3 12.5 19.8 10.4 39.9 14.1 55.9 10.4 11.6-2.6 21.1-9.6 25.9-20.2 12.5-.1 26.3-5.4 48.3-6.6 14.9-1.2 33.6 5.3 55.1 4.1 .6 2.3 1.4 4.6 2.5 6.7v.1c8.3 16.7 23.8 24.3 40.3 23 16.6-1.3 34.1-11 48.3-27.9 13.6-16.4 36-23.2 50.9-32.2 7.4-4.5 13.4-10.1 13.9-18.3 .4-8.2-4.4-17.3-15.5-29.7zM223.7 87.3c9.8-22.2 34.2-21.8 44-.4 6.5 14.2 3.6 30.9-4.3 40.4-1.6-.8-5.9-2.6-12.6-4.9 1.1-1.2 3.1-2.7 3.9-4.6 4.8-11.8-.2-27-9.1-27.3-7.3-.5-13.9 10.8-11.8 23-4.1-2-9.4-3.5-13-4.4-1-6.9-.3-14.6 2.9-21.8zM183 75.8c10.1 0 20.8 14.2 19.1 33.5-3.5 1-7.1 2.5-10.2 4.6 1.2-8.9-3.3-20.1-9.6-19.6-8.4 .7-9.8 21.2-1.8 28.1 1 .8 1.9-.2-5.9 5.5-15.6-14.6-10.5-52.1 8.4-52.1zm-13.6 60.7c6.2-4.6 13.6-10 14.1-10.5 4.7-4.4 13.5-14.2 27.9-14.2 7.1 0 15.6 2.3 25.9 8.9 6.3 4.1 11.3 4.4 22.6 9.3 8.4 3.5 13.7 9.7 10.5 18.2-2.6 7.1-11 14.4-22.7 18.1-11.1 3.6-19.8 16-38.2 14.9-3.9-.2-7-1-9.6-2.1-8-3.5-12.2-10.4-20-15-8.6-4.8-13.2-10.4-14.7-15.3-1.4-4.9 0-9 4.2-12.3zm3.3 334c-2.7 35.1-43.9 34.4-75.3 18-29.9-15.8-68.6-6.5-76.5-21.9-2.4-4.7-2.4-12.7 2.6-26.4v-.2c2.4-7.6 .6-16-.6-23.9-1.2-7.8-1.8-15 .9-20 3.5-6.7 8.5-9.1 14.8-11.3 10.3-3.7 11.8-3.4 19.6-9.9 5.5-5.7 9.5-12.9 14.3-18 5.1-5.5 10-8.1 17.7-6.9 8.1 1.2 15.1 6.8 21.9 16l19.6 35.6c9.5 19.9 43.1 48.4 41 68.9zm-1.4-25.9c-4.1-6.6-9.6-13.6-14.4-19.6 7.1 0 14.2-2.2 16.7-8.9 2.3-6.2 0-14.9-7.4-24.9-13.5-18.2-38.3-32.5-38.3-32.5-13.5-8.4-21.1-18.7-24.6-29.9s-3-23.3-.3-35.2c5.2-22.9 18.6-45.2 27.2-59.2 2.3-1.7 .8 3.2-8.7 20.8-8.5 16.1-24.4 53.3-2.6 82.4 .6-20.7 5.5-41.8 13.8-61.5 12-27.4 37.3-74.9 39.3-112.7 1.1 .8 4.6 3.2 6.2 4.1 4.6 2.7 8.1 6.7 12.6 10.3 12.4 10 28.5 9.2 42.4 1.2 6.2-3.5 11.2-7.5 15.9-9 9.9-3.1 17.8-8.6 22.3-15 7.7 30.4 25.7 74.3 37.2 95.7 6.1 11.4 18.3 35.5 23.6 64.6 3.3-.1 7 .4 10.9 1.4 13.8-35.7-11.7-74.2-23.3-84.9-4.7-4.6-4.9-6.6-2.6-6.5 12.6 11.2 29.2 33.7 35.2 59 2.8 11.6 3.3 23.7 .4 35.7 16.4 6.8 35.9 17.9 30.7 34.8-2.2-.1-3.2 0-4.2 0 3.2-10.1-3.9-17.6-22.8-26.1-19.6-8.6-36-8.6-38.3 12.5-12.1 4.2-18.3 14.7-21.4 27.3-2.8 11.2-3.6 24.7-4.4 39.9-.5 7.7-3.6 18-6.8 29-32.1 22.9-76.7 32.9-114.3 7.2zm257.4-11.5c-.9 16.8-41.2 19.9-63.2 46.5-13.2 15.7-29.4 24.4-43.6 25.5s-26.5-4.8-33.7-19.3c-4.7-11.1-2.4-23.1 1.1-36.3 3.7-14.2 9.2-28.8 9.9-40.6 .8-15.2 1.7-28.5 4.2-38.7 2.6-10.3 6.6-17.2 13.7-21.1 .3-.2 .7-.3 1-.5 .8 13.2 7.3 26.6 18.8 29.5 12.6 3.3 30.7-7.5 38.4-16.3 9-.3 15.7-.9 22.6 5.1 9.9 8.5 7.1 30.3 17.1 41.6 10.6 11.6 14 19.5 13.7 24.6zM173.3 148.7c2 1.9 4.7 4.5 8 7.1 6.6 5.2 15.8 10.6 27.3 10.6 11.6 0 22.5-5.9 31.8-10.8 4.9-2.6 10.9-7 14.8-10.4s5.9-6.3 3.1-6.6-2.6 2.6-6 5.1c-4.4 3.2-9.7 7.4-13.9 9.8-7.4 4.2-19.5 10.2-29.9 10.2s-18.7-4.8-24.9-9.7c-3.1-2.5-5.7-5-7.7-6.9-1.5-1.4-1.9-4.6-4.3-4.9-1.4-.1-1.8 3.7 1.7 6.5z" /> </svg> )} </div> )} <div className="flex flex-row gap-2 items-left"> {pypi && ( <a target="_blank" href={`https://pypi.org/project/${pypi}/`} rel="noreferrer"> <img src={`https://img.shields.io/pypi/v/${pypi}?color=blue`} className="h-5" alt="PyPI" /> </a> )} {npm && ( <a target="_blank" href={`https://www.npmjs.com/package/${npm}`} rel="noreferrer"> <img src={`https://img.shields.io/npm/v/${npm}?color=bf4c4b`} className="h-5" alt="NPM" /> </a> )} </div> </div> ); }; const tocHeader = () => { return ( <div className="w-fit"> <PlatformIcons /> <div className="flex gap-2 mt-2"> {github && github.length > 0 && (github.length === 1 ? ( <a href={github[0]} rel="noreferrer noopener" target="_blank" className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&_svg]:size-5 text-fd-muted-foreground md:[&_svg]:size-4.5" aria-label="Source" data-active="false"> <svg role="img" viewBox="0 0 24 24" fill="currentColor"> <path d="M12 .297c-6.63 0-12 5.373-12 12 0 5.303 3.438 9.8 8.205 11.385.6.113.82-.258.82-.577 0-.285-.01-1.04-.015-2.04-3.338.724-4.042-1.61-4.042-1.61C4.422 18.07 3.633 17.7 3.633 17.7c-1.087-.744.084-.729.084-.729 1.205.084 1.838 1.236 1.838 1.236 1.07 1.835 2.809 1.305 3.495.998.108-.776.417-1.305.76-1.605-2.665-.3-5.466-1.332-5.466-5.93 0-1.31.465-2.38 1.235-3.22-.135-.303-.54-1.523.105-3.176 0 0 1.005-.322 3.3 1.23.96-.267 1.98-.399 3-.405 1.02.006 2.04.138 3 .405 2.28-1.552 3.285-1.23 3.285-1.23.645 1.653.24 2.873.12 3.176.765.84 1.23 1.91 1.23 3.22 0 4.61-2.805 5.625-5.475 5.92.42.36.81 1.096.81 2.22 0 1.606-.015 2.896-.015 3.286 0 .315.21.69.825.57C20.565 22.092 24 17.592 24 12.297c0-6.627-5.373-12-12-12"></path> </svg> Source <ExternalLink className="w-4 h-4 ml-auto" /> </a> ) : ( <Popover> <PopoverTrigger className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&_svg]:size-5 text-fd-muted-foreground md:[&_svg]:size-4.5"> <svg role="img" viewBox="0 0 24 24" fill="currentColor"> <path d="M12 .297c-6.63 0-12 5.373-12 12 0 5.303 3.438 9.8 8.205 11.385.6.113.82-.258.82-.577 0-.285-.01-1.04-.015-2.04-3.338.724-4.042-1.61-4.042-1.61C4.422 18.07 3.633 17.7 3.633 17.7c-1.087-.744.084-.729.084-.729 1.205.084 1.838 1.236 1.838 1.236 1.07 1.835 2.809 1.305 3.495.998.108-.776.417-1.305.76-1.605-2.665-.3-5.466-1.332-5.466-5.93 0-1.31.465-2.38 1.235-3.22-.135-.303-.54-1.523.105-3.176 0 0 1.005-.322 3.3 1.23.96-.267 1.98-.399 3-.405 1.02.006 2.04.138 3 .405 2.28-1.552 3.285-1.23 3.285-1.23.645 1.653.24 2.873.12 3.176.765.84 1.23 1.91 1.23 3.22 0 4.61-2.805 5.625-5.475 5.92.42.36.81 1.096.81 2.22 0 1.606-.015 2.896-.015 3.286 0 .315.21.69.825.57C20.565 22.092 24 17.592 24 12.297c0-6.627-5.373-12-12-12"></path> </svg> Source <ChevronDown className="h-4 w-4" /> </PopoverTrigger> <PopoverContent className="w-48 p-1"> <div className="flex flex-col gap-1"> {github.map((link, index) => ( <a key={index} href={link} rel="noreferrer noopener" target="_blank" className="inline-flex gap-2 w-full items-center rounded-md p-2 text-sm hover:bg-fd-accent hover:text-fd-accent-foreground"> {link.includes('python') ? 'Python' : link.includes('typescript') ? 'TypeScript' : `Source ${index + 1}`} <ExternalLink className="w-4 h-4 ml-auto" /> </a> ))} </div> </PopoverContent> </Popover> ))} {slug.includes('libraries') && ( <a className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&_svg]:size-5 text-fd-muted-foreground md:[&_svg]:size-4.5" href={`/api/${page.data.title.toLowerCase()}`}> <CodeXml size={12} /> Reference </a> )} </div> <hr className="my-2 border-t border-fd-border" /> </div> ); }; return ( <DocsPage toc={page.data.toc} tableOfContent={{ header: tocHeader() }} full={page.data.full}> <div className="flex flex-row w-full items-start"> <div className="flex-1"> <div className="flex flex-row w-full"> <DocsTitle>{page.data.title}</DocsTitle> <div className="ml-auto"> {apiSection && versionItems.length > 1 && ( <Popover> <PopoverTrigger className={cn( buttonVariants({ color: 'secondary', size: 'sm', className: 'gap-2', }) )}> {(() => { // Find the current version label let currentLabel = 'Current'; if (apiVersionSlug.length > 0) { const found = versionItems.find( (item) => item.label !== 'Current' && apiVersionSlug[0] === item.label ); if (found) currentLabel = found.label; } return ( <> API Version: {currentLabel} <ChevronDown className="size-3.5 text-fd-muted-foreground" /> </> ); })()} </PopoverTrigger> <PopoverContent className="flex flex-col overflow-auto"> {versionItems.map((item) => { // Build the href for each version const href = item.label === 'Current' ? `/api/${apiSection}` : `/api/${apiSection}/${item.label}`; // Highlight current version const isCurrent = (item.label === 'Current' && apiVersionSlug.length === 0) || (item.label !== 'Current' && apiVersionSlug[0] === item.label); return ( <Link key={item.label} href={href} className={cn( 'px-3 py-1 rounded hover:bg-fd-muted', isCurrent && 'font-bold bg-fd-muted' )}> API version: {item.label} </Link> ); })} </PopoverContent> </Popover> )} </div> </div> <DocsDescription className="text-md mt-1"> {page.data.description} </DocsDescription> </div> </div> <DocsBody> <MDXContent components={getMDXComponents({ // this allows you to link to other pages with relative file paths a: createRelativeLink(source, page), })} /> </DocsBody> </DocsPage> ); } export async function generateStaticParams() { return source.generateParams(); } export async function generateMetadata(props: { params: Promise<{ slug?: string[] }>; }): Promise<Metadata> { const params = await props.params; const page = source.getPage(params.slug); if (!page) notFound(); let title = `${page.data.title} | Cua Docs`; if (page.url.includes('api')) title = `${page.data.title} | Cua API Docs`; if (page.url.includes('guide')) title = ` Guide: ${page.data.title} | Cua Docs`; return { title, description: page.data.description, openGraph: { title, description: page.data.description, type: 'article', siteName: 'Cua Docs', url: 'https://trycua.com/docs', }, }; } ``` -------------------------------------------------------------------------------- /libs/python/mcp-server/mcp_server/server.py: -------------------------------------------------------------------------------- ```python import asyncio import base64 import inspect import logging import os import signal import sys import traceback import uuid from typing import Any, Dict, List, Optional, Union, Tuple import anyio # Configure logging to output to stderr for debug visibility logging.basicConfig( level=logging.DEBUG, # Changed to DEBUG format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", stream=sys.stderr, ) logger = logging.getLogger("mcp-server") # More visible startup message logger.debug("MCP Server module loading...") try: from mcp.server.fastmcp import Context, FastMCP # Use the canonical Image type from mcp.server.fastmcp.utilities.types import Image logger.debug("Successfully imported FastMCP") except ImportError as e: logger.error(f"Failed to import FastMCP: {e}") traceback.print_exc(file=sys.stderr) sys.exit(1) try: from computer import Computer from agent import ComputerAgent logger.debug("Successfully imported Computer and Agent modules") except ImportError as e: logger.error(f"Failed to import Computer/Agent modules: {e}") traceback.print_exc(file=sys.stderr) sys.exit(1) try: from .session_manager import get_session_manager, initialize_session_manager, shutdown_session_manager logger.debug("Successfully imported session manager") except ImportError as e: logger.error(f"Failed to import session manager: {e}") traceback.print_exc(file=sys.stderr) sys.exit(1) def get_env_bool(key: str, default: bool = False) -> bool: """Get boolean value from environment variable.""" return os.getenv(key, str(default)).lower() in ("true", "1", "yes") async def _maybe_call_ctx_method(ctx: Context, method_name: str, *args, **kwargs) -> None: """Call a context helper if it exists, awaiting the result when necessary.""" method = getattr(ctx, method_name, None) if not callable(method): return result = method(*args, **kwargs) if inspect.isawaitable(result): await result def _normalise_message_content(content: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]: """Normalise message content to a list of structured parts.""" if isinstance(content, list): return content if content is None: return [] return [{"type": "output_text", "text": str(content)}] def _extract_text_from_content(content: Union[str, List[Dict[str, Any]]]) -> str: """Extract textual content for inclusion in the aggregated result string.""" if isinstance(content, str): return content texts: List[str] = [] for part in content or []: if not isinstance(part, dict): continue if part.get("type") in {"output_text", "text"} and part.get("text"): texts.append(str(part["text"])) return "\n".join(texts) def _serialise_tool_content(content: Any) -> str: """Convert tool outputs into a string for aggregation.""" if isinstance(content, str): return content if isinstance(content, list): texts: List[str] = [] for part in content: if isinstance(part, dict) and part.get("type") in {"output_text", "text"} and part.get("text"): texts.append(str(part["text"])) if texts: return "\n".join(texts) if content is None: return "" return str(content) def serve() -> FastMCP: """Create and configure the MCP server.""" # NOTE: Do not pass model_config here; FastMCP 2.12.x doesn't support it. server = FastMCP(name="cua-agent") @server.tool(structured_output=False) async def screenshot_cua(ctx: Context, session_id: Optional[str] = None) -> Any: """ Take a screenshot of the current MacOS VM screen and return the image. Args: session_id: Optional session ID for multi-client support. If not provided, a new session will be created. """ session_manager = get_session_manager() async with session_manager.get_session(session_id) as session: screenshot = await session.computer.interface.screenshot() # Returning Image object is fine when structured_output=False return Image(format="png", data=screenshot) @server.tool(structured_output=False) async def run_cua_task(ctx: Context, task: str, session_id: Optional[str] = None) -> Any: """ Run a Computer-Use Agent (CUA) task in a MacOS VM and return (combined text, final screenshot). Args: task: The task description for the agent to execute session_id: Optional session ID for multi-client support. If not provided, a new session will be created. """ session_manager = get_session_manager() task_id = str(uuid.uuid4()) try: logger.info(f"Starting CUA task: {task} (task_id: {task_id})") async with session_manager.get_session(session_id) as session: # Register this task with the session await session_manager.register_task(session.session_id, task_id) try: # Get model name model_name = os.getenv("CUA_MODEL_NAME", "anthropic/claude-3-5-sonnet-20241022") logger.info(f"Using model: {model_name}") # Create agent with the new v0.4.x API agent = ComputerAgent( model=model_name, only_n_most_recent_images=int(os.getenv("CUA_MAX_IMAGES", "3")), verbosity=logging.INFO, tools=[session.computer], ) messages = [{"role": "user", "content": task}] # Collect all results aggregated_messages: List[str] = [] async for result in agent.run(messages): logger.info("Agent processing step") ctx.info("Agent processing step") outputs = result.get("output", []) for output in outputs: output_type = output.get("type") if output_type == "message": logger.debug("Streaming assistant message: %s", output) content = _normalise_message_content(output.get("content")) aggregated_text = _extract_text_from_content(content) if aggregated_text: aggregated_messages.append(aggregated_text) await _maybe_call_ctx_method( ctx, "yield_message", role=output.get("role", "assistant"), content=content, ) elif output_type in {"tool_use", "computer_call", "function_call"}: logger.debug("Streaming tool call: %s", output) call_id = output.get("id") or output.get("call_id") tool_name = output.get("name") or output.get("action", {}).get("type") tool_input = output.get("input") or output.get("arguments") or output.get("action") if call_id: await _maybe_call_ctx_method( ctx, "yield_tool_call", name=tool_name, call_id=call_id, input=tool_input, ) elif output_type in {"tool_result", "computer_call_output", "function_call_output"}: logger.debug("Streaming tool output: %s", output) call_id = output.get("call_id") or output.get("id") content = output.get("content") or output.get("output") aggregated_text = _serialise_tool_content(content) if aggregated_text: aggregated_messages.append(aggregated_text) if call_id: await _maybe_call_ctx_method( ctx, "yield_tool_output", call_id=call_id, output=content, is_error=output.get("status") == "failed" or output.get("is_error", False), ) logger.info("CUA task completed successfully") ctx.info("CUA task completed successfully") screenshot_image = Image( format="png", data=await session.computer.interface.screenshot(), ) return ( "\n".join(aggregated_messages).strip() or "Task completed with no text output.", screenshot_image, ) finally: # Unregister the task from the session await session_manager.unregister_task(session.session_id, task_id) except Exception as e: error_msg = f"Error running CUA task: {str(e)}\n{traceback.format_exc()}" logger.error(error_msg) ctx.error(error_msg) # Try to get a screenshot from the session if available try: if session_id: async with session_manager.get_session(session_id) as session: screenshot = await session.computer.interface.screenshot() return ( f"Error during task execution: {str(e)}", Image(format="png", data=screenshot), ) except Exception: pass # If we can't get a screenshot, return a placeholder return ( f"Error during task execution: {str(e)}", Image(format="png", data=b""), ) @server.tool(structured_output=False) async def run_multi_cua_tasks(ctx: Context, tasks: List[str], session_id: Optional[str] = None, concurrent: bool = False) -> Any: """ Run multiple CUA tasks and return a list of (combined text, screenshot). Args: tasks: List of task descriptions to execute session_id: Optional session ID for multi-client support. If not provided, a new session will be created. concurrent: If True, run tasks concurrently. If False, run sequentially (default). """ total_tasks = len(tasks) if total_tasks == 0: ctx.report_progress(1.0) return [] session_manager = get_session_manager() if concurrent and total_tasks > 1: # Run tasks concurrently logger.info(f"Running {total_tasks} tasks concurrently") ctx.info(f"Running {total_tasks} tasks concurrently") # Create tasks with progress tracking async def run_task_with_progress(task_index: int, task: str) -> Tuple[int, Tuple[str, Image]]: ctx.report_progress(task_index / total_tasks) result = await run_cua_task(ctx, task, session_id) ctx.report_progress((task_index + 1) / total_tasks) return task_index, result # Create all task coroutines task_coroutines = [run_task_with_progress(i, task) for i, task in enumerate(tasks)] # Wait for all tasks to complete results_with_indices = await asyncio.gather(*task_coroutines, return_exceptions=True) # Sort results by original task order and handle exceptions results: List[Tuple[str, Image]] = [] for result in results_with_indices: if isinstance(result, Exception): logger.error(f"Task failed with exception: {result}") ctx.error(f"Task failed: {str(result)}") results.append((f"Task failed: {str(result)}", Image(format="png", data=b""))) else: _, task_result = result results.append(task_result) return results else: # Run tasks sequentially (original behavior) logger.info(f"Running {total_tasks} tasks sequentially") ctx.info(f"Running {total_tasks} tasks sequentially") results: List[Tuple[str, Image]] = [] for i, task in enumerate(tasks): logger.info(f"Running task {i+1}/{total_tasks}: {task}") ctx.info(f"Running task {i+1}/{total_tasks}: {task}") ctx.report_progress(i / total_tasks) task_result = await run_cua_task(ctx, task, session_id) results.append(task_result) ctx.report_progress((i + 1) / total_tasks) return results @server.tool(structured_output=False) async def get_session_stats(ctx: Context) -> Dict[str, Any]: """ Get statistics about active sessions and resource usage. """ session_manager = get_session_manager() return session_manager.get_session_stats() @server.tool(structured_output=False) async def cleanup_session(ctx: Context, session_id: str) -> str: """ Cleanup a specific session and release its resources. Args: session_id: The session ID to cleanup """ session_manager = get_session_manager() await session_manager.cleanup_session(session_id) return f"Session {session_id} cleanup initiated" return server server = serve() async def run_server(): """Run the MCP server with proper lifecycle management.""" session_manager = None try: logger.debug("Starting MCP server...") # Initialize session manager session_manager = await initialize_session_manager() logger.info("Session manager initialized") # Set up signal handlers for graceful shutdown def signal_handler(signum, frame): logger.info(f"Received signal {signum}, initiating graceful shutdown...") # Create a task to shutdown gracefully asyncio.create_task(graceful_shutdown()) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Start the server logger.info("Starting FastMCP server...") # Use run_stdio_async directly instead of server.run() to avoid nested event loops await server.run_stdio_async() except Exception as e: logger.error(f"Error starting server: {e}") traceback.print_exc(file=sys.stderr) raise finally: # Ensure cleanup happens if session_manager: logger.info("Shutting down session manager...") await shutdown_session_manager() async def graceful_shutdown(): """Gracefully shutdown the server and all sessions.""" logger.info("Initiating graceful shutdown...") try: await shutdown_session_manager() logger.info("Graceful shutdown completed") except Exception as e: logger.error(f"Error during graceful shutdown: {e}") finally: # Exit the process import os os._exit(0) def main(): """Run the MCP server with proper async lifecycle management.""" try: # Use anyio.run instead of asyncio.run to avoid nested event loop issues anyio.run(run_server) except KeyboardInterrupt: logger.info("Server interrupted by user") except Exception as e: logger.error(f"Error starting server: {e}") traceback.print_exc(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ```