#
tokens: 48702/50000 11/513 files (page 9/16)
lines: off (toggle) GitHub
raw markdown copy
This is page 9 of 16. Use http://codebase.md/trycua/cua?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .all-contributorsrc
├── .cursorignore
├── .devcontainer
│   ├── devcontainer.json
│   ├── post-install.sh
│   └── README.md
├── .dockerignore
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── scripts
│   │   ├── get_pyproject_version.py
│   │   └── tests
│   │       ├── __init__.py
│   │       ├── README.md
│   │       └── test_get_pyproject_version.py
│   └── workflows
│       ├── ci-lume.yml
│       ├── docker-publish-kasm.yml
│       ├── docker-publish-xfce.yml
│       ├── docker-reusable-publish.yml
│       ├── npm-publish-computer.yml
│       ├── npm-publish-core.yml
│       ├── publish-lume.yml
│       ├── pypi-publish-agent.yml
│       ├── pypi-publish-computer-server.yml
│       ├── pypi-publish-computer.yml
│       ├── pypi-publish-core.yml
│       ├── pypi-publish-mcp-server.yml
│       ├── pypi-publish-pylume.yml
│       ├── pypi-publish-som.yml
│       ├── pypi-reusable-publish.yml
│       └── test-validation-script.yml
├── .gitignore
├── .vscode
│   ├── docs.code-workspace
│   ├── launch.json
│   ├── libs-ts.code-workspace
│   ├── lume.code-workspace
│   ├── lumier.code-workspace
│   ├── py.code-workspace
│   └── settings.json
├── blog
│   ├── app-use.md
│   ├── assets
│   │   ├── composite-agents.png
│   │   ├── docker-ubuntu-support.png
│   │   ├── hack-booth.png
│   │   ├── hack-closing-ceremony.jpg
│   │   ├── hack-cua-ollama-hud.jpeg
│   │   ├── hack-leaderboard.png
│   │   ├── hack-the-north.png
│   │   ├── hack-winners.jpeg
│   │   ├── hack-workshop.jpeg
│   │   ├── hud-agent-evals.png
│   │   └── trajectory-viewer.jpeg
│   ├── bringing-computer-use-to-the-web.md
│   ├── build-your-own-operator-on-macos-1.md
│   ├── build-your-own-operator-on-macos-2.md
│   ├── composite-agents.md
│   ├── cua-hackathon.md
│   ├── hack-the-north.md
│   ├── hud-agent-evals.md
│   ├── human-in-the-loop.md
│   ├── introducing-cua-cloud-containers.md
│   ├── lume-to-containerization.md
│   ├── sandboxed-python-execution.md
│   ├── training-computer-use-models-trajectories-1.md
│   ├── trajectory-viewer.md
│   ├── ubuntu-docker-support.md
│   └── windows-sandbox.md
├── CONTRIBUTING.md
├── Development.md
├── Dockerfile
├── docs
│   ├── .gitignore
│   ├── .prettierrc
│   ├── content
│   │   └── docs
│   │       ├── agent-sdk
│   │       │   ├── agent-loops.mdx
│   │       │   ├── benchmarks
│   │       │   │   ├── index.mdx
│   │       │   │   ├── interactive.mdx
│   │       │   │   ├── introduction.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── osworld-verified.mdx
│   │       │   │   ├── screenspot-pro.mdx
│   │       │   │   └── screenspot-v2.mdx
│   │       │   ├── callbacks
│   │       │   │   ├── agent-lifecycle.mdx
│   │       │   │   ├── cost-saving.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── logging.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── pii-anonymization.mdx
│   │       │   │   └── trajectories.mdx
│   │       │   ├── chat-history.mdx
│   │       │   ├── custom-computer-handlers.mdx
│   │       │   ├── custom-tools.mdx
│   │       │   ├── customizing-computeragent.mdx
│   │       │   ├── integrations
│   │       │   │   ├── hud.mdx
│   │       │   │   └── meta.json
│   │       │   ├── message-format.mdx
│   │       │   ├── meta.json
│   │       │   ├── migration-guide.mdx
│   │       │   ├── prompt-caching.mdx
│   │       │   ├── supported-agents
│   │       │   │   ├── composed-agents.mdx
│   │       │   │   ├── computer-use-agents.mdx
│   │       │   │   ├── grounding-models.mdx
│   │       │   │   ├── human-in-the-loop.mdx
│   │       │   │   └── meta.json
│   │       │   ├── supported-model-providers
│   │       │   │   ├── index.mdx
│   │       │   │   └── local-models.mdx
│   │       │   └── usage-tracking.mdx
│   │       ├── computer-sdk
│   │       │   ├── cloud-vm-management.mdx
│   │       │   ├── commands.mdx
│   │       │   ├── computer-ui.mdx
│   │       │   ├── computers.mdx
│   │       │   ├── meta.json
│   │       │   └── sandboxed-python.mdx
│   │       ├── index.mdx
│   │       ├── libraries
│   │       │   ├── agent
│   │       │   │   └── index.mdx
│   │       │   ├── computer
│   │       │   │   └── index.mdx
│   │       │   ├── computer-server
│   │       │   │   ├── Commands.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── REST-API.mdx
│   │       │   │   └── WebSocket-API.mdx
│   │       │   ├── core
│   │       │   │   └── index.mdx
│   │       │   ├── lume
│   │       │   │   ├── cli-reference.mdx
│   │       │   │   ├── faq.md
│   │       │   │   ├── http-api.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── meta.json
│   │       │   │   └── prebuilt-images.mdx
│   │       │   ├── lumier
│   │       │   │   ├── building-lumier.mdx
│   │       │   │   ├── docker-compose.mdx
│   │       │   │   ├── docker.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   └── meta.json
│   │       │   ├── mcp-server
│   │       │   │   ├── client-integrations.mdx
│   │       │   │   ├── configuration.mdx
│   │       │   │   ├── index.mdx
│   │       │   │   ├── installation.mdx
│   │       │   │   ├── llm-integrations.mdx
│   │       │   │   ├── meta.json
│   │       │   │   ├── tools.mdx
│   │       │   │   └── usage.mdx
│   │       │   └── som
│   │       │       ├── configuration.mdx
│   │       │       └── index.mdx
│   │       ├── meta.json
│   │       ├── quickstart-cli.mdx
│   │       ├── quickstart-devs.mdx
│   │       └── telemetry.mdx
│   ├── next.config.mjs
│   ├── package-lock.json
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── postcss.config.mjs
│   ├── public
│   │   └── img
│   │       ├── agent_gradio_ui.png
│   │       ├── agent.png
│   │       ├── cli.png
│   │       ├── computer.png
│   │       ├── som_box_threshold.png
│   │       └── som_iou_threshold.png
│   ├── README.md
│   ├── source.config.ts
│   ├── src
│   │   ├── app
│   │   │   ├── (home)
│   │   │   │   ├── [[...slug]]
│   │   │   │   │   └── page.tsx
│   │   │   │   └── layout.tsx
│   │   │   ├── api
│   │   │   │   └── search
│   │   │   │       └── route.ts
│   │   │   ├── favicon.ico
│   │   │   ├── global.css
│   │   │   ├── layout.config.tsx
│   │   │   ├── layout.tsx
│   │   │   ├── llms.mdx
│   │   │   │   └── [[...slug]]
│   │   │   │       └── route.ts
│   │   │   └── llms.txt
│   │   │       └── route.ts
│   │   ├── assets
│   │   │   ├── discord-black.svg
│   │   │   ├── discord-white.svg
│   │   │   ├── logo-black.svg
│   │   │   └── logo-white.svg
│   │   ├── components
│   │   │   ├── iou.tsx
│   │   │   └── mermaid.tsx
│   │   ├── lib
│   │   │   ├── llms.ts
│   │   │   └── source.ts
│   │   └── mdx-components.tsx
│   └── tsconfig.json
├── examples
│   ├── agent_examples.py
│   ├── agent_ui_examples.py
│   ├── cloud_api_examples.py
│   ├── computer_examples_windows.py
│   ├── computer_examples.py
│   ├── computer_ui_examples.py
│   ├── computer-example-ts
│   │   ├── .env.example
│   │   ├── .gitignore
│   │   ├── .prettierrc
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── README.md
│   │   ├── src
│   │   │   ├── helpers.ts
│   │   │   └── index.ts
│   │   └── tsconfig.json
│   ├── docker_examples.py
│   ├── evals
│   │   ├── hud_eval_examples.py
│   │   └── wikipedia_most_linked.txt
│   ├── pylume_examples.py
│   ├── sandboxed_functions_examples.py
│   ├── som_examples.py
│   ├── utils.py
│   └── winsandbox_example.py
├── img
│   ├── agent_gradio_ui.png
│   ├── agent.png
│   ├── cli.png
│   ├── computer.png
│   ├── logo_black.png
│   └── logo_white.png
├── libs
│   ├── kasm
│   │   ├── Dockerfile
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── src
│   │       └── ubuntu
│   │           └── install
│   │               └── firefox
│   │                   ├── custom_startup.sh
│   │                   ├── firefox.desktop
│   │                   └── install_firefox.sh
│   ├── lume
│   │   ├── .cursorignore
│   │   ├── CONTRIBUTING.md
│   │   ├── Development.md
│   │   ├── img
│   │   │   └── cli.png
│   │   ├── Package.resolved
│   │   ├── Package.swift
│   │   ├── README.md
│   │   ├── resources
│   │   │   └── lume.entitlements
│   │   ├── scripts
│   │   │   ├── build
│   │   │   │   ├── build-debug.sh
│   │   │   │   ├── build-release-notarized.sh
│   │   │   │   └── build-release.sh
│   │   │   └── install.sh
│   │   ├── src
│   │   │   ├── Commands
│   │   │   │   ├── Clone.swift
│   │   │   │   ├── Config.swift
│   │   │   │   ├── Create.swift
│   │   │   │   ├── Delete.swift
│   │   │   │   ├── Get.swift
│   │   │   │   ├── Images.swift
│   │   │   │   ├── IPSW.swift
│   │   │   │   ├── List.swift
│   │   │   │   ├── Logs.swift
│   │   │   │   ├── Options
│   │   │   │   │   └── FormatOption.swift
│   │   │   │   ├── Prune.swift
│   │   │   │   ├── Pull.swift
│   │   │   │   ├── Push.swift
│   │   │   │   ├── Run.swift
│   │   │   │   ├── Serve.swift
│   │   │   │   ├── Set.swift
│   │   │   │   └── Stop.swift
│   │   │   ├── ContainerRegistry
│   │   │   │   ├── ImageContainerRegistry.swift
│   │   │   │   ├── ImageList.swift
│   │   │   │   └── ImagesPrinter.swift
│   │   │   ├── Errors
│   │   │   │   └── Errors.swift
│   │   │   ├── FileSystem
│   │   │   │   ├── Home.swift
│   │   │   │   ├── Settings.swift
│   │   │   │   ├── VMConfig.swift
│   │   │   │   ├── VMDirectory.swift
│   │   │   │   └── VMLocation.swift
│   │   │   ├── LumeController.swift
│   │   │   ├── Main.swift
│   │   │   ├── Server
│   │   │   │   ├── Handlers.swift
│   │   │   │   ├── HTTP.swift
│   │   │   │   ├── Requests.swift
│   │   │   │   ├── Responses.swift
│   │   │   │   └── Server.swift
│   │   │   ├── Utils
│   │   │   │   ├── CommandRegistry.swift
│   │   │   │   ├── CommandUtils.swift
│   │   │   │   ├── Logger.swift
│   │   │   │   ├── NetworkUtils.swift
│   │   │   │   ├── Path.swift
│   │   │   │   ├── ProcessRunner.swift
│   │   │   │   ├── ProgressLogger.swift
│   │   │   │   ├── String.swift
│   │   │   │   └── Utils.swift
│   │   │   ├── Virtualization
│   │   │   │   ├── DarwinImageLoader.swift
│   │   │   │   ├── DHCPLeaseParser.swift
│   │   │   │   ├── ImageLoaderFactory.swift
│   │   │   │   └── VMVirtualizationService.swift
│   │   │   ├── VM
│   │   │   │   ├── DarwinVM.swift
│   │   │   │   ├── LinuxVM.swift
│   │   │   │   ├── VM.swift
│   │   │   │   ├── VMDetails.swift
│   │   │   │   ├── VMDetailsPrinter.swift
│   │   │   │   ├── VMDisplayResolution.swift
│   │   │   │   └── VMFactory.swift
│   │   │   └── VNC
│   │   │       ├── PassphraseGenerator.swift
│   │   │       └── VNCService.swift
│   │   └── tests
│   │       ├── Mocks
│   │       │   ├── MockVM.swift
│   │       │   ├── MockVMVirtualizationService.swift
│   │       │   └── MockVNCService.swift
│   │       ├── VM
│   │       │   └── VMDetailsPrinterTests.swift
│   │       ├── VMTests.swift
│   │       ├── VMVirtualizationServiceTests.swift
│   │       └── VNCServiceTests.swift
│   ├── lumier
│   │   ├── .dockerignore
│   │   ├── Dockerfile
│   │   ├── README.md
│   │   └── src
│   │       ├── bin
│   │       │   └── entry.sh
│   │       ├── config
│   │       │   └── constants.sh
│   │       ├── hooks
│   │       │   └── on-logon.sh
│   │       └── lib
│   │           ├── utils.sh
│   │           └── vm.sh
│   ├── python
│   │   ├── agent
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── agent
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── adapters
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── huggingfacelocal_adapter.py
│   │   │   │   │   ├── human_adapter.py
│   │   │   │   │   ├── mlxvlm_adapter.py
│   │   │   │   │   └── models
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── generic.py
│   │   │   │   │       ├── internvl.py
│   │   │   │   │       ├── opencua.py
│   │   │   │   │       └── qwen2_5_vl.py
│   │   │   │   ├── agent.py
│   │   │   │   ├── callbacks
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── budget_manager.py
│   │   │   │   │   ├── image_retention.py
│   │   │   │   │   ├── logging.py
│   │   │   │   │   ├── operator_validator.py
│   │   │   │   │   ├── pii_anonymization.py
│   │   │   │   │   ├── prompt_instructions.py
│   │   │   │   │   ├── telemetry.py
│   │   │   │   │   └── trajectory_saver.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── computers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cua.py
│   │   │   │   │   └── custom.py
│   │   │   │   ├── decorators.py
│   │   │   │   ├── human_tool
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   ├── server.py
│   │   │   │   │   └── ui.py
│   │   │   │   ├── integrations
│   │   │   │   │   └── hud
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── agent.py
│   │   │   │   │       └── proxy.py
│   │   │   │   ├── loops
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── anthropic.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── composed_grounded.py
│   │   │   │   │   ├── gemini.py
│   │   │   │   │   ├── glm45v.py
│   │   │   │   │   ├── gta1.py
│   │   │   │   │   ├── holo.py
│   │   │   │   │   ├── internvl.py
│   │   │   │   │   ├── model_types.csv
│   │   │   │   │   ├── moondream3.py
│   │   │   │   │   ├── omniparser.py
│   │   │   │   │   ├── openai.py
│   │   │   │   │   ├── opencua.py
│   │   │   │   │   └── uitars.py
│   │   │   │   ├── proxy
│   │   │   │   │   ├── examples.py
│   │   │   │   │   └── handlers.py
│   │   │   │   ├── responses.py
│   │   │   │   ├── types.py
│   │   │   │   └── ui
│   │   │   │       ├── __init__.py
│   │   │   │       ├── __main__.py
│   │   │   │       └── gradio
│   │   │   │           ├── __init__.py
│   │   │   │           ├── app.py
│   │   │   │           └── ui_components.py
│   │   │   ├── benchmarks
│   │   │   │   ├── .gitignore
│   │   │   │   ├── contrib.md
│   │   │   │   ├── interactive.py
│   │   │   │   ├── models
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── gta1.py
│   │   │   │   ├── README.md
│   │   │   │   ├── ss-pro.py
│   │   │   │   ├── ss-v2.py
│   │   │   │   └── utils.py
│   │   │   ├── example.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer
│   │   │   │   ├── __init__.py
│   │   │   │   ├── computer.py
│   │   │   │   ├── diorama_computer.py
│   │   │   │   ├── helpers.py
│   │   │   │   ├── interface
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   ├── models.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── logger.py
│   │   │   │   ├── models.py
│   │   │   │   ├── providers
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── cloud
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── docker
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── lume
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── lume_api.py
│   │   │   │   │   ├── lumier
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   └── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── winsandbox
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       ├── provider.py
│   │   │   │   │       └── setup_script.ps1
│   │   │   │   ├── ui
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── __main__.py
│   │   │   │   │   └── gradio
│   │   │   │   │       ├── __init__.py
│   │   │   │   │       └── app.py
│   │   │   │   └── utils.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── computer-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── computer_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── cli.py
│   │   │   │   ├── diorama
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── diorama_computer.py
│   │   │   │   │   ├── diorama.py
│   │   │   │   │   ├── draw.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── safezone.py
│   │   │   │   ├── handlers
│   │   │   │   │   ├── base.py
│   │   │   │   │   ├── factory.py
│   │   │   │   │   ├── generic.py
│   │   │   │   │   ├── linux.py
│   │   │   │   │   ├── macos.py
│   │   │   │   │   └── windows.py
│   │   │   │   ├── main.py
│   │   │   │   ├── server.py
│   │   │   │   └── watchdog.py
│   │   │   ├── examples
│   │   │   │   ├── __init__.py
│   │   │   │   └── usage_example.py
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   ├── run_server.py
│   │   │   └── test_connection.py
│   │   ├── core
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── core
│   │   │   │   ├── __init__.py
│   │   │   │   └── telemetry
│   │   │   │       ├── __init__.py
│   │   │   │       └── posthog.py
│   │   │   ├── poetry.toml
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   ├── mcp-server
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── CONCURRENT_SESSIONS.md
│   │   │   ├── mcp_server
│   │   │   │   ├── __init__.py
│   │   │   │   ├── __main__.py
│   │   │   │   ├── server.py
│   │   │   │   └── session_manager.py
│   │   │   ├── pdm.lock
│   │   │   ├── pyproject.toml
│   │   │   ├── README.md
│   │   │   └── scripts
│   │   │       ├── install_mcp_server.sh
│   │   │       └── start_mcp_server.sh
│   │   ├── pylume
│   │   │   ├── __init__.py
│   │   │   ├── .bumpversion.cfg
│   │   │   ├── pylume
│   │   │   │   ├── __init__.py
│   │   │   │   ├── client.py
│   │   │   │   ├── exceptions.py
│   │   │   │   ├── lume
│   │   │   │   ├── models.py
│   │   │   │   ├── pylume.py
│   │   │   │   └── server.py
│   │   │   ├── pyproject.toml
│   │   │   └── README.md
│   │   └── som
│   │       ├── .bumpversion.cfg
│   │       ├── LICENSE
│   │       ├── poetry.toml
│   │       ├── pyproject.toml
│   │       ├── README.md
│   │       ├── som
│   │       │   ├── __init__.py
│   │       │   ├── detect.py
│   │       │   ├── detection.py
│   │       │   ├── models.py
│   │       │   ├── ocr.py
│   │       │   ├── util
│   │       │   │   └── utils.py
│   │       │   └── visualization.py
│   │       └── tests
│   │           └── test_omniparser.py
│   ├── typescript
│   │   ├── .gitignore
│   │   ├── .nvmrc
│   │   ├── agent
│   │   │   ├── examples
│   │   │   │   ├── playground-example.html
│   │   │   │   └── README.md
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── client.ts
│   │   │   │   ├── index.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   └── client.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── biome.json
│   │   ├── computer
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── computer
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── providers
│   │   │   │   │   │   ├── base.ts
│   │   │   │   │   │   ├── cloud.ts
│   │   │   │   │   │   └── index.ts
│   │   │   │   │   └── types.ts
│   │   │   │   ├── index.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── base.ts
│   │   │   │   │   ├── factory.ts
│   │   │   │   │   ├── index.ts
│   │   │   │   │   ├── linux.ts
│   │   │   │   │   ├── macos.ts
│   │   │   │   │   └── windows.ts
│   │   │   │   └── types.ts
│   │   │   ├── tests
│   │   │   │   ├── computer
│   │   │   │   │   └── cloud.test.ts
│   │   │   │   ├── interface
│   │   │   │   │   ├── factory.test.ts
│   │   │   │   │   ├── index.test.ts
│   │   │   │   │   ├── linux.test.ts
│   │   │   │   │   ├── macos.test.ts
│   │   │   │   │   └── windows.test.ts
│   │   │   │   └── setup.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── core
│   │   │   ├── .editorconfig
│   │   │   ├── .gitattributes
│   │   │   ├── .gitignore
│   │   │   ├── LICENSE
│   │   │   ├── package.json
│   │   │   ├── README.md
│   │   │   ├── src
│   │   │   │   ├── index.ts
│   │   │   │   └── telemetry
│   │   │   │       ├── clients
│   │   │   │       │   ├── index.ts
│   │   │   │       │   └── posthog.ts
│   │   │   │       └── index.ts
│   │   │   ├── tests
│   │   │   │   └── telemetry.test.ts
│   │   │   ├── tsconfig.json
│   │   │   ├── tsdown.config.ts
│   │   │   └── vitest.config.ts
│   │   ├── package.json
│   │   ├── pnpm-lock.yaml
│   │   ├── pnpm-workspace.yaml
│   │   └── README.md
│   └── xfce
│       ├── .dockerignore
│       ├── .gitignore
│       ├── Dockerfile
│       ├── README.md
│       └── src
│           ├── scripts
│           │   ├── resize-display.sh
│           │   ├── start-computer-server.sh
│           │   ├── start-novnc.sh
│           │   ├── start-vnc.sh
│           │   └── xstartup.sh
│           ├── supervisor
│           │   └── supervisord.conf
│           └── xfce-config
│               ├── helpers.rc
│               ├── xfce4-power-manager.xml
│               └── xfce4-session.xml
├── LICENSE.md
├── Makefile
├── notebooks
│   ├── agent_nb.ipynb
│   ├── blog
│   │   ├── build-your-own-operator-on-macos-1.ipynb
│   │   └── build-your-own-operator-on-macos-2.ipynb
│   ├── composite_agents_docker_nb.ipynb
│   ├── computer_nb.ipynb
│   ├── computer_server_nb.ipynb
│   ├── customizing_computeragent.ipynb
│   ├── eval_osworld.ipynb
│   ├── ollama_nb.ipynb
│   ├── pylume_nb.ipynb
│   ├── README.md
│   ├── sota_hackathon_cloud.ipynb
│   └── sota_hackathon.ipynb
├── pdm.lock
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── samples
│   └── community
│       ├── global-online
│       │   └── README.md
│       └── hack-the-north
│           └── README.md
├── scripts
│   ├── build-uv.sh
│   ├── build.ps1
│   ├── build.sh
│   ├── cleanup.sh
│   ├── playground-docker.sh
│   ├── playground.sh
│   └── run-docker-dev.sh
└── tests
    ├── pytest.ini
    ├── shell_cmd.py
    ├── test_files.py
    ├── test_mcp_server_session_management.py
    ├── test_mcp_server_streaming.py
    ├── test_shell_bash.py
    ├── test_telemetry.py
    ├── test_venv.py
    └── test_watchdog.py
```

# Files

--------------------------------------------------------------------------------
/blog/sandboxed-python-execution.md:
--------------------------------------------------------------------------------

```markdown
# Sandboxed Python Execution: Run Code Safely in Cua Containers

*Published on June 23, 2025 by Dillon DuPont*

Cua's computer-use capabilities that we touched on in [Building your own Operator on macOS - Part 2](build-your-own-operator-on-macos-2.md) – your AI agents can click, scroll, type, and interact with any desktop application. But what if your agent needs to do more than just UI automation? What if it needs to process data, make API calls, analyze images, or run complex logic alongside those UI interactions, within the same virtual environment?

That's where Cua's `@sandboxed` decorator comes in. While Cua handles the clicking and typing, sandboxed execution lets you run full Python code inside the same virtual environment. It's like giving your AI agents a programming brain to complement their clicking fingers.

Think of it as the perfect marriage: Cua handles the "what you see" (UI interactions), while sandboxed Python handles the "what you compute" (data processing, logic, API calls) – all happening in the same isolated environment.

## So, what exactly is sandboxed execution?

Cua excels at automating user interfaces – clicking buttons, filling forms, navigating applications. But modern AI agents need to do more than just UI automation. They need to process the data they collect, make intelligent decisions, call external APIs, and run sophisticated algorithms.

Sandboxed execution bridges this gap. You write a Python function, decorate it with `@sandboxed`, and it runs inside your Cua container alongside your UI automation. Your agent can now click a button, extract some data, process it with Python, and then use those results to decide what to click next.

Here's what makes this combination powerful for AI agent development:

- **Unified environment**: Your UI automation and code execution happen in the same container
- **Rich capabilities**: Combine Cua's clicking with Python's data processing, API calls, and libraries
- **Seamless integration**: Pass data between UI interactions and Python functions effortlessly
- **Cross-platform consistency**: Your Python code runs the same way across different Cua environments
- **Complete workflows**: Build agents that can both interact with apps AND process the data they collect

## The architecture behind @sandboxed

Let's jump right into an example that'll make this crystal clear:

```python
from computer.helpers import sandboxed

@sandboxed("demo_venv")
def greet_and_print(name):
    """This function runs inside the container"""
    import PyXA  # macOS-specific library
    safari = PyXA.Application("Safari")
    html = safari.current_document.source()
    print(f"Hello from inside the container, {name}!")
    return {"greeted": name, "safari_html": html}

# When called, this executes in the container
result = await greet_and_print("Cua")
```

What's happening here? When you call `greet_and_print()`, Cua extracts the function's source code, transmits it to the container, and executes it there. The result returns to you seamlessly, while the actual execution remains completely isolated.

## How does sandboxed execution work?

Cua's sandboxed execution system employs several key architectural components:

### 1. Source Code Extraction
Cua uses Python's `inspect.getsource()` to extract your function's source code and reconstruct the function definition in the remote environment.

### 2. Virtual Environment Isolation
Each sandboxed function runs in a named virtual environment within the container. This provides complete dependency isolation between different functions and their respective environments.

### 3. Data Serialization and Transport
Arguments and return values are serialized as JSON and transported between the host and container. This ensures compatibility across different Python versions and execution environments.

### 4. Comprehensive Error Handling
The system captures both successful results and exceptions, preserving stack traces and error information for debugging purposes.

## Getting your sandbox ready

Setting up sandboxed execution is simple:

```python
import asyncio
from computer.computer import Computer
from computer.helpers import sandboxed, set_default_computer

async def main():
    # Fire up the computer
    computer = Computer()
    await computer.run()
    
    # Make it the default for all sandboxed functions
    set_default_computer(computer)
    
    # Install some packages in a virtual environment
    await computer.venv_install("demo_venv", ["requests", "beautifulsoup4"])
```

If you want to get fancy, you can specify which computer instance to use:

```python
@sandboxed("my_venv", computer=my_specific_computer)
def my_function():
    # This runs on your specified computer instance
    pass
```

## Real-world examples that actually work

### Browser automation without the headaches

Ever tried to automate a browser and had it crash your entire system? Yeah, us too. Here's how to do it safely:

```python
@sandboxed("browser_env")
def automate_browser_with_playwright():
    """Automate browser interactions using Playwright"""
    from playwright.sync_api import sync_playwright
    import time
    import base64
    from datetime import datetime
    
    try:
        with sync_playwright() as p:
            # Launch browser (visible, because why not?)
            browser = p.chromium.launch(
                headless=False,
                args=['--no-sandbox', '--disable-dev-shm-usage']
            )
            
            page = browser.new_page()
            page.set_viewport_size({"width": 1280, "height": 720})
            
            actions = []
            screenshots = {}
            
            # Let's visit example.com and poke around
            page.goto("https://example.com")
            actions.append("Navigated to example.com")
            
            # Grab a screenshot because screenshots are cool
            screenshot_bytes = page.screenshot(full_page=True)
            screenshots["initial"] = base64.b64encode(screenshot_bytes).decode()
            
            # Get some basic info
            title = page.title()
            actions.append(f"Page title: {title}")
            
            # Find links and headings
            try:
                links = page.locator("a").all()
                link_texts = [link.text_content() for link in links[:5]]
                actions.append(f"Found {len(links)} links: {link_texts}")
                
                headings = page.locator("h1, h2, h3").all()
                heading_texts = [h.text_content() for h in headings[:3]]
                actions.append(f"Found headings: {heading_texts}")
                
            except Exception as e:
                actions.append(f"Element interaction error: {str(e)}")
            
            # Let's try a form for good measure
            try:
                page.goto("https://httpbin.org/forms/post")
                actions.append("Navigated to form page")
                
                # Fill out the form
                page.fill('input[name="custname"]', "Test User from Sandboxed Environment")
                page.fill('input[name="custtel"]', "555-0123")
                page.fill('input[name="custemail"]', "[email protected]")
                page.select_option('select[name="size"]', "large")
                
                actions.append("Filled out form fields")
                
                # Submit and see what happens
                page.click('input[type="submit"]')
                page.wait_for_load_state("networkidle")
                
                actions.append("Submitted form")
                
            except Exception as e:
                actions.append(f"Form interaction error: {str(e)}")
            
            browser.close()
            
            return {
                "actions_performed": actions,
                "screenshots": screenshots,
                "success": True
            }
            
    except Exception as e:
        return {"error": f"Browser automation failed: {str(e)}"}

# Install Playwright and its browsers
await computer.venv_install("browser_env", ["playwright"])
await computer.venv_cmd("browser_env", "playwright install chromium")

# Run the automation
result = await automate_browser_with_playwright()
print(f"Performed {len(result.get('actions_performed', []))} actions")
```

### Building code analysis agents

Want to build agents that can analyze code safely? Here's a security audit tool that won't accidentally `eval()` your system into oblivion:

```python
@sandboxed("analysis_env")
def security_audit_tool(code_snippet):
    """Analyze code for potential security issues"""
    import ast
    import re
    
    issues = []
    
    # Check for the usual suspects
    dangerous_patterns = [
        (r'eval\s*\(', "Use of eval() function"),
        (r'exec\s*\(', "Use of exec() function"),
        (r'__import__\s*\(', "Dynamic import usage"),
        (r'subprocess\.', "Subprocess usage"),
        (r'os\.system\s*\(', "OS system call"),
    ]
    
    for pattern, description in dangerous_patterns:
        if re.search(pattern, code_snippet):
            issues.append(description)
    
    # Get fancy with AST analysis
    try:
        tree = ast.parse(code_snippet)
        for node in ast.walk(tree):
            if isinstance(node, ast.Call):
                if hasattr(node.func, 'id'):
                    if node.func.id in ['eval', 'exec', 'compile']:
                        issues.append(f"Dangerous function call: {node.func.id}")
    except SyntaxError:
        issues.append("Syntax error in code")
    
    return {
        "security_issues": issues,
        "risk_level": "HIGH" if len(issues) > 2 else "MEDIUM" if issues else "LOW"
    }

# Test it on some sketchy code
audit_result = await security_audit_tool("eval(user_input)")
print(f"Security audit: {audit_result}")
```

### Desktop automation in the cloud

Here's where things get really interesting. Cua Cloud Sandbox comes with full desktop environments, so you can automate GUIs:

```python
@sandboxed("desktop_env")
def take_screenshot_and_analyze():
    """Take a screenshot and analyze the desktop"""
    import io
    import base64
    from PIL import ImageGrab
    from datetime import datetime
    
    try:
        # Grab the screen
        screenshot = ImageGrab.grab()
        
        # Convert to base64 for easy transport
        buffer = io.BytesIO()
        screenshot.save(buffer, format='PNG')
        screenshot_data = base64.b64encode(buffer.getvalue()).decode()
        
        # Get some basic info
        screen_info = {
            "size": screenshot.size,
            "mode": screenshot.mode,
            "timestamp": datetime.now().isoformat()
        }
        
        # Analyze the colors (because why not?)
        colors = screenshot.getcolors(maxcolors=256*256*256)
        dominant_color = max(colors, key=lambda x: x[0])[1] if colors else None
        
        return {
            "screenshot_base64": screenshot_data,
            "screen_info": screen_info,
            "dominant_color": dominant_color,
            "unique_colors": len(colors) if colors else 0
        }
        
    except Exception as e:
        return {"error": f"Screenshot failed: {str(e)}"}

# Install the dependencies
await computer.venv_install("desktop_env", ["Pillow"])

# Take and analyze a screenshot
result = await take_screenshot_and_analyze()
print("Desktop analysis complete!")
```

## Pro tips for sandboxed success

### Keep it self-contained
Always put your imports inside the function. Trust us on this one:

```python
@sandboxed("good_env")
def good_function():
    import os  # Import inside the function
    import json
    
    # Your code here
    return {"result": "success"}
```

### Install dependencies first
Don't forget to install packages before using them:

```python
# Install first
await computer.venv_install("my_env", ["pandas", "numpy", "matplotlib"])

@sandboxed("my_env")
def data_analysis():
    import pandas as pd
    import numpy as np
    # Now you can use them
```

### Use descriptive environment names
Future you will thank you:

```python
@sandboxed("data_processing_env")
def process_data(): pass

@sandboxed("web_scraping_env") 
def scrape_site(): pass

@sandboxed("ml_training_env")
def train_model(): pass
```

### Always handle errors gracefully
Things break. Plan for it:

```python
@sandboxed("robust_env")
def robust_function(data):
    try:
        result = process_data(data)
        return {"success": True, "result": result}
    except Exception as e:
        return {"success": False, "error": str(e)}
```

## What about performance?

Let's be honest – there's some overhead here. Code needs to be serialized, sent over the network, and executed remotely. But for most use cases, the benefits far outweigh the costs.

If you're building something performance-critical, consider:
- Batching multiple operations into a single sandboxed function
- Minimizing data transfer between host and container
- Using persistent virtual environments

## The security angle

This is where sandboxed execution really shines:

1. **Complete process isolation** – code runs in a separate container
2. **File system protection** – limited access to your host files
3. **Network isolation** – controlled network access
4. **Clean environments** – no package conflicts or pollution
5. **Resource limits** – container-level constraints keep things in check

## Ready to get started?

The `@sandboxed` decorator is one of those features that sounds simple but opens up a world of possibilities. Whether you're testing sketchy code, building AI agents, or just want to keep your development environment pristine, it's got you covered.

Give it a try in your next Cua project and see how liberating it feels to run code without fear!

Happy coding (safely)!

---

*Want to dive deeper? Check out our [sandboxed functions examples](https://github.com/trycua/cua/blob/main/examples/sandboxed_functions_examples.py) and [virtual environment tests](https://github.com/trycua/cua/blob/main/tests/venv.py) on GitHub. Questions? Come chat with us on Discord!*

```

--------------------------------------------------------------------------------
/libs/lume/src/FileSystem/Settings.swift:
--------------------------------------------------------------------------------

```swift
import Foundation

/// Manages the application settings using a config file
struct LumeSettings: Codable, Sendable {
    var vmLocations: [VMLocation]
    var defaultLocationName: String
    var cacheDirectory: String
    var cachingEnabled: Bool

    var defaultLocation: VMLocation? {
        vmLocations.first { $0.name == defaultLocationName }
    }

    // For backward compatibility
    var homeDirectory: String {
        defaultLocation?.path ?? "~/.lume"
    }

    static let defaultSettings = LumeSettings(
        vmLocations: [
            VMLocation(name: "default", path: "~/.lume")
        ],
        defaultLocationName: "default",
        cacheDirectory: "~/.lume/cache",
        cachingEnabled: true
    )

    /// Gets all locations sorted by name
    var sortedLocations: [VMLocation] {
        vmLocations.sorted { $0.name < $1.name }
    }
}

final class SettingsManager: @unchecked Sendable {
    // MARK: - Constants

    private enum Constants {
        // Default path for config
        static let fallbackConfigDir = "~/.config/lume"
        static let configFileName = "config.yaml"
    }

    // MARK: - Properties

    static let shared = SettingsManager()
    private let fileManager: FileManager

    // Get the config directory following XDG spec
    private var configDir: String {
        // Check XDG_CONFIG_HOME environment variable first
        if let xdgConfigHome = ProcessInfo.processInfo.environment["XDG_CONFIG_HOME"] {
            return "\(xdgConfigHome)/lume"
        }
        // Fall back to default
        return (Constants.fallbackConfigDir as NSString).expandingTildeInPath
    }

    // Path to config file
    private var configFilePath: String {
        return "\(configDir)/\(Constants.configFileName)"
    }

    // MARK: - Initialization

    init(fileManager: FileManager = .default) {
        self.fileManager = fileManager
        ensureConfigDirectoryExists()
    }

    // MARK: - Settings Access

    func getSettings() -> LumeSettings {
        if let settings = readSettingsFromFile() {
            return settings
        }

        // No settings file found, use defaults
        let defaultSettings = LumeSettings(
            vmLocations: [
                VMLocation(name: "default", path: "~/.lume")
            ],
            defaultLocationName: "default",
            cacheDirectory: "~/.lume/cache",
            cachingEnabled: true
        )

        // Try to save default settings
        try? saveSettings(defaultSettings)

        return defaultSettings
    }

    func saveSettings(_ settings: LumeSettings) throws {
        try fileManager.createDirectory(atPath: configDir, withIntermediateDirectories: true)

        // Create a human-readable YAML-like configuration file
        var yamlContent = "# Lume Configuration\n\n"

        // Default location
        yamlContent += "defaultLocationName: \"\(settings.defaultLocationName)\"\n"

        // Cache directory
        yamlContent += "cacheDirectory: \"\(settings.cacheDirectory)\"\n"

        // Caching enabled flag
        yamlContent += "cachingEnabled: \(settings.cachingEnabled)\n"

        // VM locations
        yamlContent += "\n# VM Locations\nvmLocations:\n"
        for location in settings.vmLocations {
            yamlContent += "  - name: \"\(location.name)\"\n"
            yamlContent += "    path: \"\(location.path)\"\n"
        }

        // Write YAML content to file
        try yamlContent.write(
            to: URL(fileURLWithPath: configFilePath), atomically: true, encoding: .utf8)
    }

    // MARK: - VM Location Management

    func addLocation(_ location: VMLocation) throws {
        var settings = getSettings()

        // Validate location name (alphanumeric, dash, underscore)
        let nameRegex = try NSRegularExpression(pattern: "^[a-zA-Z0-9_-]+$")
        let nameRange = NSRange(location.name.startIndex..., in: location.name)
        if nameRegex.firstMatch(in: location.name, range: nameRange) == nil {
            throw VMLocationError.invalidLocationName(name: location.name)
        }

        // Check for duplicate name
        if settings.vmLocations.contains(where: { $0.name == location.name }) {
            throw VMLocationError.duplicateLocationName(name: location.name)
        }

        // Validate location path
        try location.validate()

        // Add location
        settings.vmLocations.append(location)
        try saveSettings(settings)
    }

    func removeLocation(name: String) throws {
        var settings = getSettings()

        // Check location exists
        guard settings.vmLocations.contains(where: { $0.name == name }) else {
            throw VMLocationError.locationNotFound(name: name)
        }

        // Prevent removing default location
        if name == settings.defaultLocationName {
            throw VMLocationError.defaultLocationCannotBeRemoved(name: name)
        }

        // Remove location
        settings.vmLocations.removeAll(where: { $0.name == name })
        try saveSettings(settings)
    }

    func setDefaultLocation(name: String) throws {
        var settings = getSettings()

        // Check location exists
        guard settings.vmLocations.contains(where: { $0.name == name }) else {
            throw VMLocationError.locationNotFound(name: name)
        }

        // Set default
        settings.defaultLocationName = name
        try saveSettings(settings)
    }

    func getLocation(name: String) throws -> VMLocation {
        let settings = getSettings()

        if let location = settings.vmLocations.first(where: { $0.name == name }) {
            return location
        }

        throw VMLocationError.locationNotFound(name: name)
    }

    // MARK: - Legacy Home Directory Compatibility

    func setHomeDirectory(path: String) throws {
        var settings = getSettings()

        let defaultLocation = VMLocation(name: "default", path: path)
        try defaultLocation.validate()

        // Replace default location
        if let index = settings.vmLocations.firstIndex(where: { $0.name == "default" }) {
            settings.vmLocations[index] = defaultLocation
        } else {
            settings.vmLocations.append(defaultLocation)
            settings.defaultLocationName = "default"
        }

        try saveSettings(settings)
    }

    // MARK: - Cache Directory Management

    func setCacheDirectory(path: String) throws {
        var settings = getSettings()

        // Validate path
        let expandedPath = (path as NSString).expandingTildeInPath
        var isDir: ObjCBool = false

        // If directory exists, check if it's writable
        if fileManager.fileExists(atPath: expandedPath, isDirectory: &isDir) {
            if !isDir.boolValue {
                throw SettingsError.notADirectory(path: expandedPath)
            }

            if !fileManager.isWritableFile(atPath: expandedPath) {
                throw SettingsError.directoryNotWritable(path: expandedPath)
            }
        } else {
            // Try to create the directory
            do {
                try fileManager.createDirectory(
                    atPath: expandedPath,
                    withIntermediateDirectories: true
                )
            } catch {
                throw SettingsError.directoryCreationFailed(path: expandedPath, error: error)
            }
        }

        // Update settings
        settings.cacheDirectory = path
        try saveSettings(settings)
    }

    func getCacheDirectory() -> String {
        return getSettings().cacheDirectory
    }

    func setCachingEnabled(_ enabled: Bool) throws {
        var settings = getSettings()
        settings.cachingEnabled = enabled
        try saveSettings(settings)
    }

    func isCachingEnabled() -> Bool {
        return getSettings().cachingEnabled
    }

    // MARK: - Private Helpers

    private func ensureConfigDirectoryExists() {
        try? fileManager.createDirectory(atPath: configDir, withIntermediateDirectories: true)
    }

    private func readSettingsFromFile() -> LumeSettings? {
        // Read from YAML file
        if fileExists(at: configFilePath) {
            do {
                let yamlString = try String(
                    contentsOf: URL(fileURLWithPath: configFilePath), encoding: .utf8)
                return parseYamlSettings(yamlString)
            } catch {
                Logger.error(
                    "Failed to read settings from YAML file",
                    metadata: ["error": error.localizedDescription]
                )
            }
        }
        return nil
    }

    private func parseYamlSettings(_ yamlString: String) -> LumeSettings? {
        // This is a very basic YAML parser for our specific config format
        // A real implementation would use a proper YAML library

        var defaultLocationName = "default"
        var cacheDirectory = "~/.lume/cache"
        var cachingEnabled = true  // default to true for backward compatibility
        var vmLocations: [VMLocation] = []

        var inLocationsSection = false
        var currentLocation: (name: String?, path: String?) = (nil, nil)

        let lines = yamlString.split(separator: "\n")

        for (_, line) in lines.enumerated() {
            let trimmedLine = line.trimmingCharacters(in: .whitespaces)

            // Skip comments and empty lines
            if trimmedLine.hasPrefix("#") || trimmedLine.isEmpty {
                continue
            }

            // Check for section marker
            if trimmedLine == "vmLocations:" {
                inLocationsSection = true
                continue
            }

            // In the locations section, handle line indentation more carefully
            if inLocationsSection {
                if trimmedLine.hasPrefix("-") || trimmedLine.contains("- name:") {
                    // Process the previous location before starting a new one
                    if let name = currentLocation.name, let path = currentLocation.path {
                        vmLocations.append(VMLocation(name: name, path: path))
                    }
                    currentLocation = (nil, nil)
                }

                // Process the key-value pairs within a location
                if let colonIndex = trimmedLine.firstIndex(of: ":") {
                    let key = trimmedLine[..<colonIndex].trimmingCharacters(in: .whitespaces)
                    let rawValue = trimmedLine[trimmedLine.index(after: colonIndex)...]
                        .trimmingCharacters(in: .whitespaces)
                    let value = extractValueFromYaml(rawValue)

                    if key.hasSuffix("name") {
                        currentLocation.name = value
                    } else if key.hasSuffix("path") {
                        currentLocation.path = value
                    }
                }
            } else {
                // Process top-level keys outside the locations section
                if let colonIndex = trimmedLine.firstIndex(of: ":") {
                    let key = trimmedLine[..<colonIndex].trimmingCharacters(in: .whitespaces)
                    let rawValue = trimmedLine[trimmedLine.index(after: colonIndex)...]
                        .trimmingCharacters(in: .whitespaces)
                    let value = extractValueFromYaml(rawValue)

                    if key == "defaultLocationName" {
                        defaultLocationName = value
                    } else if key == "cacheDirectory" {
                        cacheDirectory = value
                    } else if key == "cachingEnabled" {
                        cachingEnabled = value.lowercased() == "true"
                    }
                }
            }
        }

        // Don't forget to add the last location
        if let name = currentLocation.name, let path = currentLocation.path {
            vmLocations.append(VMLocation(name: name, path: path))
        }

        // Ensure at least one location exists
        if vmLocations.isEmpty {
            vmLocations.append(VMLocation(name: "default", path: "~/.lume"))
        }

        return LumeSettings(
            vmLocations: vmLocations,
            defaultLocationName: defaultLocationName,
            cacheDirectory: cacheDirectory,
            cachingEnabled: cachingEnabled
        )
    }

    // Helper method to extract a value from YAML, handling quotes
    private func extractValueFromYaml(_ rawValue: String) -> String {
        if rawValue.hasPrefix("\"") && rawValue.hasSuffix("\"") && rawValue.count >= 2 {
            // Remove the surrounding quotes
            let startIndex = rawValue.index(after: rawValue.startIndex)
            let endIndex = rawValue.index(before: rawValue.endIndex)
            return String(rawValue[startIndex..<endIndex])
        }
        return rawValue
    }

    // Helper method to output debug information about the current settings
    func debugSettings() -> String {
        let settings = getSettings()

        var output = "Current Settings:\n"
        output += "- Default VM storage: \(settings.defaultLocationName)\n"
        output += "- Cache directory: \(settings.cacheDirectory)\n"
        output += "- VM Locations (\(settings.vmLocations.count)):\n"

        for (i, location) in settings.vmLocations.enumerated() {
            let isDefault = location.name == settings.defaultLocationName
            let defaultMark = isDefault ? " (default)" : ""
            output += "  \(i+1). \(location.name): \(location.path)\(defaultMark)\n"
        }

        // Also add raw file content
        if fileExists(at: configFilePath) {
            if let content = try? String(contentsOf: URL(fileURLWithPath: configFilePath)) {
                output += "\nRaw YAML file content:\n"
                output += content
            }
        }

        return output
    }

    private func fileExists(at path: String) -> Bool {
        fileManager.fileExists(atPath: path)
    }
}

// MARK: - Errors

enum SettingsError: Error, LocalizedError {
    case notADirectory(path: String)
    case directoryNotWritable(path: String)
    case directoryCreationFailed(path: String, error: Error)

    var errorDescription: String? {
        switch self {
        case .notADirectory(let path):
            return "Path is not a directory: \(path)"
        case .directoryNotWritable(let path):
            return "Directory is not writable: \(path)"
        case .directoryCreationFailed(let path, let error):
            return "Failed to create directory at \(path): \(error.localizedDescription)"
        }
    }
}

```

--------------------------------------------------------------------------------
/libs/python/agent/agent/adapters/mlxvlm_adapter.py:
--------------------------------------------------------------------------------

```python
import asyncio
import functools
import warnings
import io
import base64
import math
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Iterator, AsyncIterator, Dict, List, Any, Optional, Tuple, cast
from PIL import Image
from litellm.types.utils import GenericStreamingChunk, ModelResponse
from litellm.llms.custom_llm import CustomLLM
from litellm import completion, acompletion

# Try to import MLX dependencies
try:
    import mlx.core as mx
    from mlx_vlm import load, generate
    from mlx_vlm.prompt_utils import apply_chat_template
    from mlx_vlm.utils import load_config
    from transformers.tokenization_utils import PreTrainedTokenizer
    MLX_AVAILABLE = True
except ImportError:
    MLX_AVAILABLE = False

# Constants for smart_resize
IMAGE_FACTOR = 28
MIN_PIXELS = 100 * 28 * 28
MAX_PIXELS = 16384 * 28 * 28
MAX_RATIO = 200

def round_by_factor(number: float, factor: int) -> int:
    """Returns the closest integer to 'number' that is divisible by 'factor'."""
    return round(number / factor) * factor

def ceil_by_factor(number: float, factor: int) -> int:
    """Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'."""
    return math.ceil(number / factor) * factor

def floor_by_factor(number: float, factor: int) -> int:
    """Returns the largest integer less than or equal to 'number' that is divisible by 'factor'."""
    return math.floor(number / factor) * factor

def smart_resize(
    height: int, width: int, factor: int = IMAGE_FACTOR, min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS
) -> tuple[int, int]:
    """
    Rescales the image so that the following conditions are met:

    1. Both dimensions (height and width) are divisible by 'factor'.
    2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
    3. The aspect ratio of the image is maintained as closely as possible.
    """
    if max(height, width) / min(height, width) > MAX_RATIO:
        raise ValueError(
            f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}"
        )
    h_bar = max(factor, round_by_factor(height, factor))
    w_bar = max(factor, round_by_factor(width, factor))
    if h_bar * w_bar > max_pixels:
        beta = math.sqrt((height * width) / max_pixels)
        h_bar = floor_by_factor(height / beta, factor)
        w_bar = floor_by_factor(width / beta, factor)
    elif h_bar * w_bar < min_pixels:
        beta = math.sqrt(min_pixels / (height * width))
        h_bar = ceil_by_factor(height * beta, factor)
        w_bar = ceil_by_factor(width * beta, factor)
    return h_bar, w_bar


class MLXVLMAdapter(CustomLLM):
    """MLX VLM Adapter for running vision-language models locally using MLX."""
    
    def __init__(self, **kwargs):
        """Initialize the adapter.
        
        Args:
            **kwargs: Additional arguments
        """
        super().__init__()
        
        self.models = {}  # Cache for loaded models
        self.processors = {}  # Cache for loaded processors
        self.configs = {}  # Cache for loaded configs
        self._executor = ThreadPoolExecutor(max_workers=1)  # Single thread pool
        
    def _load_model_and_processor(self, model_name: str):
        """Load model and processor if not already cached.
        
        Args:
            model_name: Name of the model to load
            
        Returns:
            Tuple of (model, processor, config)
        """
        if not MLX_AVAILABLE:
            raise ImportError("MLX VLM dependencies not available. Please install mlx-vlm.")
        
        if model_name not in self.models:
            # Load model and processor
            model_obj, processor = load(
                model_name, 
                processor_kwargs={"min_pixels": MIN_PIXELS, "max_pixels": MAX_PIXELS}
            )
            config = load_config(model_name)
            
            # Cache them
            self.models[model_name] = model_obj
            self.processors[model_name] = processor
            self.configs[model_name] = config
            
        return self.models[model_name], self.processors[model_name], self.configs[model_name]
    
    def _process_coordinates(self, text: str, original_size: Tuple[int, int], model_size: Tuple[int, int]) -> str:
        """Process coordinates in box tokens based on image resizing using smart_resize approach.
        
        Args:
            text: Text containing box tokens
            original_size: Original image size (width, height)
            model_size: Model processed image size (width, height)
            
        Returns:
            Text with processed coordinates
        """
        # Find all box tokens
        box_pattern = r"<\|box_start\|>\((\d+),\s*(\d+)\)<\|box_end\|>"
        
        def process_coords(match):
            model_x, model_y = int(match.group(1)), int(match.group(2))
            # Scale coordinates from model space to original image space
            # Both original_size and model_size are in (width, height) format
            new_x = int(model_x * original_size[0] / model_size[0])  # Width
            new_y = int(model_y * original_size[1] / model_size[1])  # Height
            return f"<|box_start|>({new_x},{new_y})<|box_end|>"
        
        return re.sub(box_pattern, process_coords, text)
    
    def _convert_messages(self, messages: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Image.Image], Dict[int, Tuple[int, int]], Dict[int, Tuple[int, int]]]:
        """Convert OpenAI format messages to MLX VLM format and extract images.
        
        Args:
            messages: Messages in OpenAI format
            
        Returns:
            Tuple of (processed_messages, images, original_sizes, model_sizes)
        """
        processed_messages = []
        images = []
        original_sizes = {}  # Track original sizes of images for coordinate mapping
        model_sizes = {}  # Track model processed sizes
        image_index = 0
        
        for message in messages:
            processed_message = {
                "role": message["role"],
                "content": []
            }
            
            content = message.get("content", [])
            if isinstance(content, str):
                # Simple text content
                processed_message["content"] = content
            elif isinstance(content, list):
                # Multi-modal content
                processed_content = []
                for item in content:
                    if item.get("type") == "text":
                        processed_content.append({
                            "type": "text",
                            "text": item.get("text", "")
                        })
                    elif item.get("type") == "image_url":
                        image_url = item.get("image_url", {}).get("url", "")
                        pil_image = None
                        
                        if image_url.startswith("data:image/"):
                            # Extract base64 data
                            base64_data = image_url.split(',')[1]
                            # Convert base64 to PIL Image
                            image_data = base64.b64decode(base64_data)
                            pil_image = Image.open(io.BytesIO(image_data))
                        else:
                            # Handle file path or URL
                            pil_image = Image.open(image_url)
                        
                        # Store original image size for coordinate mapping
                        original_size = pil_image.size
                        original_sizes[image_index] = original_size
                        
                        # Use smart_resize to determine model size
                        # Note: smart_resize expects (height, width) but PIL gives (width, height)
                        height, width = original_size[1], original_size[0]
                        new_height, new_width = smart_resize(height, width)
                        # Store model size in (width, height) format for consistent coordinate processing
                        model_sizes[image_index] = (new_width, new_height)
                        
                        # Resize the image using the calculated dimensions from smart_resize
                        resized_image = pil_image.resize((new_width, new_height))
                        images.append(resized_image)
                        
                        # Add image placeholder to content
                        processed_content.append({
                            "type": "image"
                        })
                        
                        image_index += 1
                
                processed_message["content"] = processed_content
            
            processed_messages.append(processed_message)
        
        return processed_messages, images, original_sizes, model_sizes
    
    def _generate(self, **kwargs) -> str:
        """Generate response using the local MLX VLM model.
        
        Args:
            **kwargs: Keyword arguments containing messages and model info
            
        Returns:
            Generated text response
        """
        messages = kwargs.get('messages', [])
        model_name = kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit')
        max_tokens = kwargs.get('max_tokens', 128)
        
        # Warn about ignored kwargs
        ignored_kwargs = set(kwargs.keys()) - {'messages', 'model', 'max_tokens'}
        if ignored_kwargs:
            warnings.warn(f"Ignoring unsupported kwargs: {ignored_kwargs}")
        
        # Load model and processor
        model, processor, config = self._load_model_and_processor(model_name)
        
        # Convert messages and extract images
        processed_messages, images, original_sizes, model_sizes = self._convert_messages(messages)
        
        # Process user text input with box coordinates after image processing
        # Swap original_size and model_size arguments for inverse transformation
        for msg_idx, msg in enumerate(processed_messages):
            if msg.get("role") == "user" and isinstance(msg.get("content"), str):
                content = msg.get("content", "")
                if "<|box_start|>" in content and original_sizes and model_sizes and 0 in original_sizes and 0 in model_sizes:
                    orig_size = original_sizes[0]
                    model_size = model_sizes[0]
                    # Swap arguments to perform inverse transformation for user input
                    processed_messages[msg_idx]["content"] = self._process_coordinates(content, model_size, orig_size)
        
        try:
            # Format prompt according to model requirements using the processor directly
            prompt = processor.apply_chat_template(
                processed_messages,
                tokenize=False,
                add_generation_prompt=True,
                return_tensors='pt'
            )
            tokenizer = cast(PreTrainedTokenizer, processor)
            
            # Generate response
            text_content, usage = generate(
                model, 
                tokenizer, 
                str(prompt), 
                images, # type: ignore
                verbose=False,
                max_tokens=max_tokens
            )
            
        except Exception as e:
            raise RuntimeError(f"Error generating response: {str(e)}") from e
        
        # Process coordinates in the response back to original image space
        if original_sizes and model_sizes and 0 in original_sizes and 0 in model_sizes:
            # Get original image size and model size (using the first image)
            orig_size = original_sizes[0]
            model_size = model_sizes[0]
            
            # Check if output contains box tokens that need processing
            if "<|box_start|>" in text_content:
                # Process coordinates from model space back to original image space
                text_content = self._process_coordinates(text_content, orig_size, model_size)
        
        return text_content
    
    def completion(self, *args, **kwargs) -> ModelResponse:
        """Synchronous completion method.
        
        Returns:
            ModelResponse with generated text
        """
        generated_text = self._generate(**kwargs)
        
        result = completion(
            model=f"mlx/{kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit')}",
            mock_response=generated_text,
        )
        return cast(ModelResponse, result)
    
    async def acompletion(self, *args, **kwargs) -> ModelResponse:
        """Asynchronous completion method.
        
        Returns:
            ModelResponse with generated text
        """
        # Run _generate in thread pool to avoid blocking
        loop = asyncio.get_event_loop()
        generated_text = await loop.run_in_executor(
            self._executor, 
            functools.partial(self._generate, **kwargs)
        )
        
        result = await acompletion(
            model=f"mlx/{kwargs.get('model', 'mlx-community/UI-TARS-1.5-7B-4bit')}",
            mock_response=generated_text,
        )
        return cast(ModelResponse, result)
    
    def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
        """Synchronous streaming method.
        
        Returns:
            Iterator of GenericStreamingChunk
        """
        generated_text = self._generate(**kwargs)
        
        generic_streaming_chunk: GenericStreamingChunk = {
            "finish_reason": "stop",
            "index": 0,
            "is_finished": True,
            "text": generated_text,
            "tool_use": None,
            "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
        }
        
        yield generic_streaming_chunk
    
    async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
        """Asynchronous streaming method.
        
        Returns:
            AsyncIterator of GenericStreamingChunk
        """
        # Run _generate in thread pool to avoid blocking
        loop = asyncio.get_event_loop()
        generated_text = await loop.run_in_executor(
            self._executor, 
            functools.partial(self._generate, **kwargs)
        )
        
        generic_streaming_chunk: GenericStreamingChunk = {
            "finish_reason": "stop",
            "index": 0,
            "is_finished": True,
            "text": generated_text,
            "tool_use": None,
            "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
        }
        
        yield generic_streaming_chunk
```

--------------------------------------------------------------------------------
/libs/python/agent/agent/loops/omniparser.py:
--------------------------------------------------------------------------------

```python
"""
OpenAI computer-use-preview agent loop implementation using liteLLM
Paper: https://arxiv.org/abs/2408.00203
Code: https://github.com/microsoft/OmniParser
"""

import asyncio
import json
from typing import Dict, List, Any, AsyncGenerator, Union, Optional, Tuple
import litellm
import inspect
import base64

from ..decorators import register_agent
from ..types import Messages, AgentResponse, Tools, AgentCapability
from ..loops.base import AsyncAgentConfig

SOM_TOOL_SCHEMA = {
  "type": "function",
  "name": "computer",
  "description": "Control a computer by taking screenshots and interacting with UI elements. This tool shows screenshots with numbered elements overlaid on them. Each UI element has been assigned a unique ID number that you can see in the image. Use the element's ID number to interact with any element instead of pixel coordinates.",
  "parameters": {
    "type": "object",
    "properties": {
      "action": {
        "type": "string",
        "enum": [
          "screenshot",
          "click",
          "double_click",
          "drag",
          "type",
          "keypress",
          "scroll",
          "move",
          "wait",
          "get_current_url",
          "get_dimensions",
          "get_environment"
        ],
        "description": "The action to perform"
      },
      "element_id": {
        "type": "integer",
        "description": "The ID of the element to interact with (required for click, double_click, move, scroll actions, and as start/end for drag)"
      },
      "start_element_id": {
        "type": "integer",
        "description": "The ID of the element to start dragging from (required for drag action)"
      },
      "end_element_id": {
        "type": "integer",
        "description": "The ID of the element to drag to (required for drag action)"
      },
      "text": {
        "type": "string",
        "description": "The text to type (required for type action)"
      },
      "keys": {
        "type": "string",
        "description": "Key combination to press (required for keypress action). Single key for individual key press, multiple keys for combinations (e.g., 'ctrl+c')"
      },
      "button": {
        "type": "string",
        "description": "The mouse button to use for click action (left, right, wheel, back, forward) Default: left",
      },
      "scroll_x": {
        "type": "integer",
        "description": "Horizontal scroll amount for scroll action (positive for right, negative for left)",
      },
      "scroll_y": {
        "type": "integer",
        "description": "Vertical scroll amount for scroll action (positive for down, negative for up)",
      },
    },
    "required": [
      "action"
    ]
  }
}

OMNIPARSER_AVAILABLE = False
try:
    from som import OmniParser
    OMNIPARSER_AVAILABLE = True
except ImportError:
    pass
OMNIPARSER_SINGLETON = None

def get_parser():
    global OMNIPARSER_SINGLETON
    if OMNIPARSER_SINGLETON is None:
        OMNIPARSER_SINGLETON = OmniParser()
    return OMNIPARSER_SINGLETON
    
def get_last_computer_call_output(messages: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
    """Get the last computer_call_output message from a messages list.
    
    Args:
        messages: List of messages to search through
        
    Returns:
        The last computer_call_output message dict, or None if not found
    """
    for message in reversed(messages):
        if isinstance(message, dict) and message.get("type") == "computer_call_output":
            return message
    return None

def _prepare_tools_for_omniparser(tool_schemas: List[Dict[str, Any]]) -> Tuple[Tools, dict]:
    """Prepare tools for OpenAI API format"""
    omniparser_tools = []
    id2xy = dict()
    
    for schema in tool_schemas:
        if schema["type"] == "computer":
            omniparser_tools.append(SOM_TOOL_SCHEMA)
            if "id2xy" in schema:
                id2xy = schema["id2xy"]
            else:
                schema["id2xy"] = id2xy
        elif schema["type"] == "function":
            # Function tools use OpenAI-compatible schema directly (liteLLM expects this format)
            # Schema should be: {type, name, description, parameters}
            omniparser_tools.append({ "type": "function", **schema["function"] })
    
    return omniparser_tools, id2xy

async def replace_function_with_computer_call(item: Dict[str, Any], id2xy: Dict[int, Tuple[float, float]]):
  item_type = item.get("type")

  def _get_xy(element_id: Optional[int]) -> Union[Tuple[float, float], Tuple[None, None]]:
    if element_id is None:
      return (None, None)
    return id2xy.get(element_id, (None, None))

  if item_type == "function_call":
    fn_name = item.get("name")
    fn_args = json.loads(item.get("arguments", "{}"))

    item_id = item.get("id")
    call_id = item.get("call_id")
    
    if fn_name == "computer":
      action = fn_args.get("action")
      element_id = fn_args.get("element_id")
      start_element_id = fn_args.get("start_element_id")
      end_element_id = fn_args.get("end_element_id")
      text = fn_args.get("text")
      keys = fn_args.get("keys")
      button = fn_args.get("button")
      scroll_x = fn_args.get("scroll_x")
      scroll_y = fn_args.get("scroll_y")

      x, y = _get_xy(element_id)
      start_x, start_y = _get_xy(start_element_id)
      end_x, end_y = _get_xy(end_element_id)

      action_args = {
          "type": action,
          "x": x,
          "y": y,
          "start_x": start_x,
          "start_y": start_y,
          "end_x": end_x,
          "end_y": end_y,
          "text": text,
          "keys": keys,
          "button": button,
          "scroll_x": scroll_x,
          "scroll_y": scroll_y
        }
      # Remove None values to keep the JSON clean
      action_args = {k: v for k, v in action_args.items() if v is not None}

      return [{
        "type": "computer_call",
        "action": action_args,
        "id": item_id,
        "call_id": call_id,
        "status": "completed"
      }]

  return [item]

async def replace_computer_call_with_function(item: Dict[str, Any], xy2id: Dict[Tuple[float, float], int]):
    """
    Convert computer_call back to function_call format.
    Also handles computer_call_output -> function_call_output conversion.
    
    Args:
        item: The item to convert
        xy2id: Mapping from (x, y) coordinates to element IDs
    """
    item_type = item.get("type")

    def _get_element_id(x: Optional[float], y: Optional[float]) -> Optional[int]:
        """Get element ID from coordinates, return None if coordinates are None"""
        if x is None or y is None:
            return None
        return xy2id.get((x, y))

    if item_type == "computer_call":
        action_data = item.get("action", {})
        
        # Extract coordinates and convert back to element IDs
        element_id = _get_element_id(action_data.get("x"), action_data.get("y"))
        start_element_id = _get_element_id(action_data.get("start_x"), action_data.get("start_y"))
        end_element_id = _get_element_id(action_data.get("end_x"), action_data.get("end_y"))
        
        # Build function arguments
        fn_args = {
            "action": action_data.get("type"),
            "element_id": element_id,
            "start_element_id": start_element_id,
            "end_element_id": end_element_id,
            "text": action_data.get("text"),
            "keys": action_data.get("keys"),
            "button": action_data.get("button"),
            "scroll_x": action_data.get("scroll_x"),
            "scroll_y": action_data.get("scroll_y")
        }
        
        # Remove None values to keep the JSON clean
        fn_args = {k: v for k, v in fn_args.items() if v is not None}
        
        return [{
            "type": "function_call",
            "name": "computer",
            "arguments": json.dumps(fn_args),
            "id": item.get("id"),
            "call_id": item.get("call_id"),
            "status": "completed",

            # Fall back to string representation
            "content": f"Used tool: {action_data.get("type")}({json.dumps(fn_args)})"
        }]
    
    elif item_type == "computer_call_output":
        # Simple conversion: computer_call_output -> function_call_output
        return [{
            "type": "function_call_output",
            "call_id": item.get("call_id"),
            "content": [item.get("output")],
            "id": item.get("id"),
            "status": "completed"
        }]

    return [item]


@register_agent(models=r"omniparser\+.*|omni\+.*", priority=2)
class OmniparserConfig(AsyncAgentConfig):
    """Omniparser agent configuration implementing AsyncAgentConfig protocol."""
    
    async def predict_step(
        self,
        messages: List[Dict[str, Any]],
        model: str,
        tools: Optional[List[Dict[str, Any]]] = None,
        max_retries: Optional[int] = None,
        stream: bool = False,
        computer_handler=None,
        use_prompt_caching: Optional[bool] = False,
        _on_api_start=None,
        _on_api_end=None,
        _on_usage=None,
        _on_screenshot=None,
        **kwargs
    ) -> Dict[str, Any]:
        """
        OpenAI computer-use-preview agent loop using liteLLM responses.
        
        Supports OpenAI's computer use preview models.
        """
        if not OMNIPARSER_AVAILABLE:
            raise ValueError("omniparser loop requires som to be installed. Install it with `pip install cua-som`.")
          
        tools = tools or []
        
        llm_model = model.split('+')[-1]

        # Prepare tools for OpenAI API
        openai_tools, id2xy = _prepare_tools_for_omniparser(tools)

        # Find last computer_call_output
        last_computer_call_output = get_last_computer_call_output(messages) # type: ignore
        if last_computer_call_output:
            image_url = last_computer_call_output.get("output", {}).get("image_url", "")
            image_data = image_url.split(",")[-1]
            if image_data:
                parser = get_parser()
                result = parser.parse(image_data)
                if _on_screenshot:
                    await _on_screenshot(result.annotated_image_base64, "annotated_image")
                for element in result.elements:
                    id2xy[element.id] = ((element.bbox.x1 + element.bbox.x2) / 2, (element.bbox.y1 + element.bbox.y2) / 2)
        
        # handle computer calls -> function calls
        new_messages = []
        for message in messages:
            if not isinstance(message, dict):
                message = message.__dict__
            new_messages += await replace_computer_call_with_function(message, id2xy) # type: ignore
        messages = new_messages

        # Prepare API call kwargs
        api_kwargs = {
            "model": llm_model,
            "input": messages,
            "tools": openai_tools if openai_tools else None,
            "stream": stream,
            "truncation": "auto",
            "num_retries": max_retries,
            **kwargs
        }
        
        # Call API start hook
        if _on_api_start:
            await _on_api_start(api_kwargs)
        
        print(str(api_kwargs)[:1000])

        # Use liteLLM responses
        response = await litellm.aresponses(**api_kwargs)

        # Call API end hook
        if _on_api_end:
            await _on_api_end(api_kwargs, response)

        # Extract usage information
        usage = {
            **response.usage.model_dump(), # type: ignore
            "response_cost": response._hidden_params.get("response_cost", 0.0), # type: ignore
        }
        if _on_usage:
            await _on_usage(usage)

        # handle som function calls -> xy computer calls
        new_output = []
        for i in range(len(response.output)): # type: ignore
          new_output += await replace_function_with_computer_call(response.output[i].model_dump(), id2xy) # type: ignore
        
        return {
            "output": new_output,
            "usage": usage
        }
    
    async def predict_click(
        self,
        model: str,
        image_b64: str,
        instruction: str,
        **kwargs
    ) -> Optional[Tuple[float, float]]:
        """
        Predict click coordinates using OmniParser and LLM.
        
        Uses OmniParser to annotate the image with element IDs, then uses LLM
        to identify the correct element ID based on the instruction.
        """
        if not OMNIPARSER_AVAILABLE:
            return None
        
        # Parse the image with OmniParser to get annotated image and elements
        parser = get_parser()
        result = parser.parse(image_b64)
        
        # Extract the LLM model from composed model string
        llm_model = model.split('+')[-1]
        
        # Create system prompt for element ID prediction
        SYSTEM_PROMPT = f'''
You are an expert UI element locator. Given a GUI image annotated with numerical IDs over each interactable element, along with a user's element description, provide the ID of the specified element.

The image shows UI elements with numbered overlays. Each number corresponds to a clickable/interactable element.

Output only the element ID as a single integer.
'''.strip()
        
        # Prepare messages for LLM
        messages = [
            {
                "role": "system",
                "content": SYSTEM_PROMPT
            },
            {
                "role": "user",
                "content": [
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/png;base64,{result.annotated_image_base64}"
                        }
                    },
                    {
                        "type": "text",
                        "text": f"Find the element: {instruction}"
                    }
                ]
            }
        ]
        
        # Call LLM to predict element ID
        response = await litellm.acompletion(
            model=llm_model,
            messages=messages,
            max_tokens=10,
            temperature=0.1
        )
        
        # Extract element ID from response
        response_text = response.choices[0].message.content.strip() # type: ignore
        
        # Try to parse the element ID
        try:
            element_id = int(response_text)
            
            # Find the element with this ID and return its center coordinates
            for element in result.elements:
                if element.id == element_id:
                    center_x = (element.bbox.x1 + element.bbox.x2) / 2
                    center_y = (element.bbox.y1 + element.bbox.y2) / 2
                    return (center_x, center_y)
        except ValueError:
            # If we can't parse the ID, return None
            pass
            
        return None
    
    def get_capabilities(self) -> List[AgentCapability]:
        """Return the capabilities supported by this agent."""
        return ["step"]

```

--------------------------------------------------------------------------------
/libs/python/computer/computer/interface/base.py:
--------------------------------------------------------------------------------

```python
"""Base interface for computer control."""

from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, Tuple, List
from ..logger import Logger, LogLevel
from .models import MouseButton, CommandResult

class BaseComputerInterface(ABC):
    """Base class for computer control interfaces."""

    def __init__(self, ip_address: str, username: str = "lume", password: str = "lume", api_key: Optional[str] = None, vm_name: Optional[str] = None):
        """Initialize interface.

        Args:
            ip_address: IP address of the computer to control
            username: Username for authentication
            password: Password for authentication
            api_key: Optional API key for cloud authentication
            vm_name: Optional VM name for cloud authentication
        """
        self.ip_address = ip_address
        self.username = username
        self.password = password
        self.api_key = api_key
        self.vm_name = vm_name
        self.logger = Logger("cua.interface", LogLevel.NORMAL)
        
        # Optional default delay time between commands (in seconds)
        self.delay: float = 0.0

    @abstractmethod
    async def wait_for_ready(self, timeout: int = 60) -> None:
        """Wait for interface to be ready.

        Args:
            timeout: Maximum time to wait in seconds

        Raises:
            TimeoutError: If interface is not ready within timeout
        """
        pass

    @abstractmethod
    def close(self) -> None:
        """Close the interface connection."""
        pass

    def force_close(self) -> None:
        """Force close the interface connection.

        By default, this just calls close(), but subclasses can override
        to provide more forceful cleanup.
        """
        self.close()

    # Mouse Actions
    @abstractmethod
    async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: "MouseButton" = "left", delay: Optional[float] = None) -> None:
        """Press and hold a mouse button.
        
        Args:
            x: X coordinate to press at. If None, uses current cursor position.
            y: Y coordinate to press at. If None, uses current cursor position.
            button: Mouse button to press ('left', 'middle', 'right').
            delay: Optional delay in seconds after the action
        """
        pass
    
    @abstractmethod
    async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: "MouseButton" = "left", delay: Optional[float] = None) -> None:
        """Release a mouse button.
        
        Args:
            x: X coordinate to release at. If None, uses current cursor position.
            y: Y coordinate to release at. If None, uses current cursor position.
            button: Mouse button to release ('left', 'middle', 'right').
            delay: Optional delay in seconds after the action
        """
        pass
    
    @abstractmethod
    async def left_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
        """Perform a left mouse button click.
        
        Args:
            x: X coordinate to click at. If None, uses current cursor position.
            y: Y coordinate to click at. If None, uses current cursor position.
            delay: Optional delay in seconds after the action
        """
        pass

    @abstractmethod
    async def right_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
        """Perform a right mouse button click.
        
        Args:
            x: X coordinate to click at. If None, uses current cursor position.
            y: Y coordinate to click at. If None, uses current cursor position.
            delay: Optional delay in seconds after the action
        """
        pass

    @abstractmethod
    async def double_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
        """Perform a double left mouse button click.
        
        Args:
            x: X coordinate to double-click at. If None, uses current cursor position.
            y: Y coordinate to double-click at. If None, uses current cursor position.
            delay: Optional delay in seconds after the action
        """
        pass

    @abstractmethod
    async def move_cursor(self, x: int, y: int, delay: Optional[float] = None) -> None:
        """Move the cursor to the specified screen coordinates.
        
        Args:
            x: X coordinate to move cursor to.
            y: Y coordinate to move cursor to.
            delay: Optional delay in seconds after the action
        """
        pass

    @abstractmethod
    async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5, delay: Optional[float] = None) -> None:
        """Drag from current position to specified coordinates.

        Args:
            x: The x coordinate to drag to
            y: The y coordinate to drag to
            button: The mouse button to use ('left', 'middle', 'right')
            duration: How long the drag should take in seconds
            delay: Optional delay in seconds after the action
        """
        pass

    @abstractmethod
    async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5, delay: Optional[float] = None) -> None:
        """Drag the cursor along a path of coordinates.

        Args:
            path: List of (x, y) coordinate tuples defining the drag path
            button: The mouse button to use ('left', 'middle', 'right')
            duration: Total time in seconds that the drag operation should take
            delay: Optional delay in seconds after the action
        """
        pass

    # Keyboard Actions
    @abstractmethod
    async def key_down(self, key: str, delay: Optional[float] = None) -> None:
        """Press and hold a key.
        
        Args:
            key: The key to press and hold (e.g., 'a', 'shift', 'ctrl').
            delay: Optional delay in seconds after the action.
        """
        pass
    
    @abstractmethod
    async def key_up(self, key: str, delay: Optional[float] = None) -> None:
        """Release a previously pressed key.
        
        Args:
            key: The key to release (e.g., 'a', 'shift', 'ctrl').
            delay: Optional delay in seconds after the action.
        """
        pass
    
    @abstractmethod
    async def type_text(self, text: str, delay: Optional[float] = None) -> None:
        """Type the specified text string.
        
        Args:
            text: The text string to type.
            delay: Optional delay in seconds after the action.
        """
        pass

    @abstractmethod
    async def press_key(self, key: str, delay: Optional[float] = None) -> None:
        """Press and release a single key.
        
        Args:
            key: The key to press (e.g., 'a', 'enter', 'escape').
            delay: Optional delay in seconds after the action.
        """
        pass

    @abstractmethod
    async def hotkey(self, *keys: str, delay: Optional[float] = None) -> None:
        """Press multiple keys simultaneously (keyboard shortcut).
        
        Args:
            *keys: Variable number of keys to press together (e.g., 'ctrl', 'c').
            delay: Optional delay in seconds after the action.
        """
        pass

    # Scrolling Actions
    @abstractmethod
    async def scroll(self, x: int, y: int, delay: Optional[float] = None) -> None:
        """Scroll the mouse wheel by specified amounts.
        
        Args:
            x: Horizontal scroll amount (positive = right, negative = left).
            y: Vertical scroll amount (positive = up, negative = down).
            delay: Optional delay in seconds after the action.
        """
        pass
    
    @abstractmethod
    async def scroll_down(self, clicks: int = 1, delay: Optional[float] = None) -> None:
        """Scroll down by the specified number of clicks.
        
        Args:
            clicks: Number of scroll clicks to perform downward.
            delay: Optional delay in seconds after the action.
        """
        pass

    @abstractmethod
    async def scroll_up(self, clicks: int = 1, delay: Optional[float] = None) -> None:
        """Scroll up by the specified number of clicks.
        
        Args:
            clicks: Number of scroll clicks to perform upward.
            delay: Optional delay in seconds after the action.
        """
        pass

    # Screen Actions
    @abstractmethod
    async def screenshot(self) -> bytes:
        """Take a screenshot.

        Returns:
            Raw bytes of the screenshot image
        """
        pass

    @abstractmethod
    async def get_screen_size(self) -> Dict[str, int]:
        """Get the screen dimensions.

        Returns:
            Dict with 'width' and 'height' keys
        """
        pass

    @abstractmethod
    async def get_cursor_position(self) -> Dict[str, int]:
        """Get the current cursor position on screen.
        
        Returns:
            Dict with 'x' and 'y' keys containing cursor coordinates.
        """
        pass

    # Clipboard Actions
    @abstractmethod
    async def copy_to_clipboard(self) -> str:
        """Get the current clipboard content.
        
        Returns:
            The text content currently stored in the clipboard.
        """
        pass

    @abstractmethod
    async def set_clipboard(self, text: str) -> None:
        """Set the clipboard content to the specified text.
        
        Args:
            text: The text to store in the clipboard.
        """
        pass

    # File System Actions
    @abstractmethod
    async def file_exists(self, path: str) -> bool:
        """Check if a file exists at the specified path.
        
        Args:
            path: The file path to check.
            
        Returns:
            True if the file exists, False otherwise.
        """
        pass

    @abstractmethod
    async def directory_exists(self, path: str) -> bool:
        """Check if a directory exists at the specified path.
        
        Args:
            path: The directory path to check.
            
        Returns:
            True if the directory exists, False otherwise.
        """
        pass
    
    @abstractmethod
    async def list_dir(self, path: str) -> List[str]:
        """List the contents of a directory.
        
        Args:
            path: The directory path to list.
            
        Returns:
            List of file and directory names in the specified directory.
        """
        pass
    
    @abstractmethod
    async def read_text(self, path: str) -> str:
        """Read the text contents of a file.
        
        Args:
            path: The file path to read from.
            
        Returns:
            The text content of the file.
        """
        pass
    
    @abstractmethod
    async def write_text(self, path: str, content: str) -> None:
        """Write text content to a file.
        
        Args:
            path: The file path to write to.
            content: The text content to write.
        """
        pass
    
    @abstractmethod
    async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> bytes:
        """Read file binary contents with optional seeking support.
        
        Args:
            path: Path to the file
            offset: Byte offset to start reading from (default: 0)
            length: Number of bytes to read (default: None for entire file)
        """
        pass
    
    @abstractmethod
    async def write_bytes(self, path: str, content: bytes) -> None:
        """Write binary content to a file.
        
        Args:
            path: The file path to write to.
            content: The binary content to write.
        """
        pass
    
    @abstractmethod
    async def delete_file(self, path: str) -> None:
        """Delete a file at the specified path.
        
        Args:
            path: The file path to delete.
        """
        pass
    
    @abstractmethod
    async def create_dir(self, path: str) -> None:
        """Create a directory at the specified path.
        
        Args:
            path: The directory path to create.
        """
        pass
    
    @abstractmethod
    async def delete_dir(self, path: str) -> None:
        """Delete a directory at the specified path.
        
        Args:
            path: The directory path to delete.
        """
        pass
    
    @abstractmethod
    async def get_file_size(self, path: str) -> int:
        """Get the size of a file in bytes.
        
        Args:
            path: The file path to get the size of.
            
        Returns:
            The size of the file in bytes.
        """
        pass
    
    @abstractmethod
    async def run_command(self, command: str) -> CommandResult:
        """Run shell command and return structured result.
        
        Executes a shell command using subprocess.run with shell=True and check=False.
        The command is run in the target environment and captures both stdout and stderr.
        
        Args:
            command (str): The shell command to execute
            
        Returns:
            CommandResult: A structured result containing:
                - stdout (str): Standard output from the command
                - stderr (str): Standard error from the command  
                - returncode (int): Exit code from the command (0 indicates success)
                
        Raises:
            RuntimeError: If the command execution fails at the system level
            
        Example:
            result = await interface.run_command("ls -la")
            if result.returncode == 0:
                print(f"Output: {result.stdout}")
            else:
                print(f"Error: {result.stderr}, Exit code: {result.returncode}")
        """
        pass

    # Accessibility Actions
    @abstractmethod
    async def get_accessibility_tree(self) -> Dict:
        """Get the accessibility tree of the current screen.
        
        Returns:
            Dict containing the hierarchical accessibility information of screen elements.
        """
        pass
    
    @abstractmethod
    async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert screenshot coordinates to screen coordinates.

        Args:
            x: X coordinate in screenshot space
            y: Y coordinate in screenshot space

        Returns:
            tuple[float, float]: (x, y) coordinates in screen space
        """
        pass

    @abstractmethod
    async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
        """Convert screen coordinates to screenshot coordinates.

        Args:
            x: X coordinate in screen space
            y: Y coordinate in screen space

        Returns:
            tuple[float, float]: (x, y) coordinates in screenshot space
        """
        pass

```

--------------------------------------------------------------------------------
/libs/lumier/src/lib/vm.sh:
--------------------------------------------------------------------------------

```bash
#!/usr/bin/env bash

# Initialize global flags
export PULL_IN_PROGRESS=0

start_vm() {
    # Determine storage path for VM
    STORAGE_PATH="$HOST_STORAGE_PATH"
    if [ -z "$STORAGE_PATH" ]; then
        STORAGE_PATH="storage_${VM_NAME}"
    fi

    # Check if VM exists and its status using JSON format - quietly
    VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")

    # Check if VM not found error
    if [[ $VM_INFO == *"Virtual machine not found"* ]]; then
        IMAGE_NAME="${VERSION##*/}"
        # Parse registry and organization from VERSION
        REGISTRY=$(echo $VERSION | cut -d'/' -f1)
        ORGANIZATION=$(echo $VERSION | cut -d'/' -f2)
        
        echo "Pulling VM image $IMAGE_NAME..."
        lume_pull "$IMAGE_NAME" "$VM_NAME" "$STORAGE_PATH" "$REGISTRY" "$ORGANIZATION"
    else
        # Parse the JSON status - check if it contains "status" : "running"
        if [[ $VM_INFO == *'"status" : "running"'* ]]; then
            lume_stop "$VM_NAME" "$STORAGE_PATH"
        fi
    fi

    # Format memory size for display purposes
    MEMORY_DISPLAY="$RAM_SIZE"
    if [[ ! "$RAM_SIZE" == *"GB"* && ! "$RAM_SIZE" == *"MB"* ]]; then
        MEMORY_DISPLAY="${RAM_SIZE}MB"
    fi
    
    # Set VM parameters using the wrapper function
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Updating VM settings: cpu=$CPU_CORES memory=$MEMORY_DISPLAY display=$DISPLAY"
    fi
    lume_set "$VM_NAME" "$STORAGE_PATH" "$CPU_CORES" "$RAM_SIZE" "$DISPLAY"

    # Fetch VM configuration - quietly (don't display to console)
    CONFIG_JSON=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
    
    # Setup shared directory args if necessary
    SHARED_DIR_ARGS=""
    if [ -d "/shared" ]; then
        if [ -n "$HOST_SHARED_PATH" ]; then
            SHARED_DIR_ARGS="--shared-dir=$HOST_SHARED_PATH"
        else
            echo "Warning: /shared volume exists but HOST_SHARED_PATH is not set. Cannot mount volume."
        fi
    fi

    # Run VM with VNC and shared directory using curl
    lume_run $SHARED_DIR_ARGS --storage "$STORAGE_PATH" "$VM_NAME" &
    # lume run "$VM_NAME" --storage "$STORAGE_PATH" --no-display

    # sleep 10000000

    # Wait for VM to be running and VNC URL to be available
    vm_ip=""
    vnc_url=""
    max_attempts=30
    attempt=0
    
    while [ $attempt -lt $max_attempts ]; do
            # Get VM info as JSON using the API function - pass debug flag
        VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
        
        # Extract status, IP address, and VNC URL using the helper function
        vm_status=$(extract_json_field "status" "$VM_INFO")
        vm_ip=$(extract_json_field "ipAddress" "$VM_INFO")
        vnc_url=$(extract_json_field "vncUrl" "$VM_INFO")

        # Check if VM status is 'running' and we have IP and VNC URL
        if [ "$vm_status" = "running" ] && [ -n "$vm_ip" ] && [ -n "$vnc_url" ]; then
            break
        fi
        
        sleep 2
        attempt=$((attempt + 1))
    done
    
    if [ -z "$vm_ip" ] || [ -z "$vnc_url" ]; then
        echo "Timed out waiting for VM to start or VNC URL to become available."
        lume_stop "$VM_NAME" "$STORAGE_PATH" > /dev/null 2>&1
        # lume stop "$VM_NAME" --storage "$STORAGE_PATH" > /dev/null 2>&1
        exit 1
    fi

    # Parse VNC URL to extract password and port
    VNC_PASSWORD=$(echo "$vnc_url" | sed -n 's/.*:\(.*\)@.*/\1/p')
    VNC_PORT=$(echo "$vnc_url" | sed -n 's/.*:\([0-9]\+\)$/\1/p')
    
    # Wait for SSH to become available
    wait_for_ssh "$vm_ip" "$HOST_USER" "$HOST_PASSWORD" 5 20

    # Export VNC variables for entry.sh to use
    export VNC_PORT
    export VNC_PASSWORD
    
    # Execute on-logon.sh if present
    on_logon_script="/run/lifecycle/on-logon.sh"
    
    # Only show detailed logs in debug mode
    if [ "${LUMIER_DEBUG:-0}" == "1" ]; then
        echo "Running on-logon.sh hook script on VM..."
    fi
    
    # Check if script exists
    if [ ! -f "$on_logon_script" ]; then
        echo "Warning: on-logon.sh hook script not found at $on_logon_script"
    else
        # Execute the remote script
        execute_remote_script "$vm_ip" "$HOST_USER" "$HOST_PASSWORD" "$on_logon_script" "$VNC_PASSWORD" "$HOST_SHARED_PATH"
    fi
}

# Get VM information using curl
lume_get() {
    local vm_name="$1"
    local storage="$2"
    local format="${3:-json}"
    local debug="${4:-false}"
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # URL encode the storage path for the query parameter
    # Replace special characters with their URL encoded equivalents
    local encoded_storage=$(echo "$storage" | sed 's/\//%2F/g' | sed 's/ /%20/g' | sed 's/:/%3A/g')
    
    # Construct API URL with encoded storage parameter
    local api_url="http://${api_host}:${api_port}/lume/vms/${vm_name}?storage=${encoded_storage}"
    
    # Construct the full curl command
    local curl_cmd="curl --connect-timeout 6000 --max-time 5000 -s '$api_url'"
    
    # Print debug info
    if [[ "$debug" == "true" || "$LUMIER_DEBUG" == "1" ]]; then
        echo "[DEBUG] Calling API: $api_url"
        echo "[DEBUG] Full curl command: $curl_cmd"
    fi
    
    # Log curl commands only when in debug mode
    if [[ "$debug" == "true" || "$LUMIER_DEBUG" == "1" ]]; then
        echo "[$(date -u '+%Y-%m-%dT%H:%M:%SZ')] DEBUG: Executing curl request: $api_url" >&2
    fi
    
    # Make the API call
    local response=$(curl --connect-timeout 6000 \
      --max-time 5000 \
      -s \
      "$api_url")
    
    # Print the response if debugging is enabled
    if [[ "$debug" == "true" || "${LUMIER_DEBUG:-0}" == "1" ]]; then
        echo "[DEBUG] API Response:"
        echo "$response" | jq '.' 2>/dev/null || echo "$response"
    fi
    
    # Output the response so callers can capture it
    echo "$response"
}

# Set VM properties using curl
lume_set() {
    local vm_name="$1"
    local storage="$2"
    local cpu="${3:-4}"
    local memory="${4:-8192}"
    local display="${5:-1024x768}"
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # Handle memory format for the API
    if [[ "$memory" == *"GB"* ]]; then
        # Already in GB format, keep as is
        :  # No-op
    elif [[ "$memory" =~ ^[0-9]+$ ]]; then
        # If memory is a simple number, assume MB and convert to GB
        memory="$(awk "BEGIN { printf \"%.1f\", $memory/1024 }")GB"
    fi
    
    # Only show memory formatting debug in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "[DEBUG] Formatted memory value: $memory"
    fi
    
    # Store response to conditionally show based on debug mode
    local response=$(curl --connect-timeout 6000 \
      --max-time 5000 \
      -s \
      -X PATCH \
      -H "Content-Type: application/json" \
      -d "{
        \"cpu\": $cpu,
        \"memory\": \"$memory\",
        \"display\": \"$display\",
        \"storage\": \"$storage\"
      }" \
      "http://${api_host}:${api_port}/lume/vms/${vm_name}")
      
    # Only show response in debug mode
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        echo "$response"
    fi
}

stop_vm() {
    local in_cleanup=${1:-false} # Optional first argument to indicate if called from cleanup trap
    echo "Stopping VM '$VM_NAME'..."
    STORAGE_PATH="$HOST_STORAGE_PATH"
    
    # Only show storage path in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "STORAGE_PATH: $STORAGE_PATH"
    fi
    
    VM_INFO=$(lume_get "$VM_NAME" "$STORAGE_PATH" "json" "${LUMIER_DEBUG:-0}")
    vm_status=$(extract_json_field "status" "$VM_INFO")

    if [ "$vm_status" == "running" ]; then
        lume_stop "$VM_NAME" "$STORAGE_PATH"
    elif [ "$vm_status" == "stopped" ]; then
        echo "VM '$VM_NAME' is already stopped."
    elif [ "$in_cleanup" = true ]; then
        # If we are in the cleanup trap and status is unknown or VM not found, 
        # still attempt a stop just in case.
        echo "VM status is unknown ('$vm_status') or VM not found during cleanup. Attempting stop anyway."
        lume_stop "$VM_NAME" "$STORAGE_PATH"
        sleep 5
        echo "VM '$VM_NAME' stop command issued as a precaution."
    else
        echo "VM status is unknown ('$vm_status') or VM not found. Not attempting stop."
    fi
}

is_vm_running() {
    # Check VM status using the API function
    local vm_info
    vm_info=$(lume_get "$VM_NAME" "$HOST_STORAGE_PATH")
    if [[ $vm_info == *'"status" : "running"'* ]]; then
        return 0 # Running
    else
        return 1 # Not running or doesn't exist
    fi
    # lume ls | grep -q "$VM_NAME" # Old CLI check
}

# Stop VM with storage location specified using curl
lume_stop() {
    local vm_name="$1"
    local storage="$2"
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # Only log in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Stopping VM $vm_name..."
    fi
    
    # Execute command and capture response
    local response
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        # Show output in debug mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -X POST \
          -H "Content-Type: application/json" \
          -d '{"storage":"'$storage'"}' \
          "http://${api_host}:${api_port}/lume/vms/${vm_name}/stop")
        echo "$response"
    else
        # Run silently in normal mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -s \
          -X POST \
          -H "Content-Type: application/json" \
          -d '{"storage":"'$storage'"}' \
          "http://${api_host}:${api_port}/lume/vms/${vm_name}/stop")
    fi
}

# Pull a VM image using curl
lume_pull() {
    local image="$1"      # Image name with tag
    local vm_name="$2"    # Name for the new VM
    local storage="$3"    # Storage location
    local registry="${4:-ghcr.io}"  # Registry, default is ghcr.io
    local organization="${5:-trycua}" # Organization, default is trycua
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # Mark that pull is in progress for interrupt handling
    export PULL_IN_PROGRESS=1
    
    # Only log full details in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Pulling image $image from $registry/$organization..."
    else
        echo "Pulling image $image..."
    fi
    
    # Inform users how to check pull progress
    echo "You can check the pull progress using: lume logs -f"
    
    # Pull image via API and capture response
    local response
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        # Show full response in debug mode - no timeout limits
        response=$(curl \
          -X POST \
          -H "Content-Type: application/json" \
          -d "{
            \"image\": \"$image\",
            \"name\": \"$vm_name\",
            \"registry\": \"$registry\",
            \"organization\": \"$organization\",
            \"storage\": \"$storage\"
          }" \
          "http://${api_host}:${api_port}/lume/pull")
        echo "$response"
    else
        # Run silently in normal mode - no timeout limits
        response=$(curl \
          -s \
          -X POST \
          -H "Content-Type: application/json" \
          -d "{
            \"image\": \"$image\",
            \"name\": \"$vm_name\",
            \"registry\": \"$registry\",
            \"organization\": \"$organization\",
            \"storage\": \"$storage\"
          }" \
          "http://${api_host}:${api_port}/lume/pull")
    fi
    
    # Unset pull in progress flag
    export PULL_IN_PROGRESS=0
}


# Run VM with VNC client started and shared directory using curl
lume_run() {
    # Parse args
    local shared_dir=""
    local storage=""
    local vm_name="lume_vm"
    local no_display=true
    while [[ $# -gt 0 ]]; do
        case $1 in
            --shared-dir=*)
                shared_dir="${1#*=}"
                shift
                ;;
            --storage)
                storage="$2"
                shift 2
                ;;
            --no-display)
                no_display=true
                shift
                ;;
            *)
                # Assume last arg is VM name if not an option
                vm_name="$1"
                shift
                ;;
        esac
    done
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"

    # Only log in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Running VM $vm_name..."
    fi
    
    # Build the JSON body dynamically based on what's provided
    local json_body="{\"noDisplay\": true"
    
    # Only include shared directories if shared_dir is provided
    if [[ -n "$shared_dir" ]]; then
        json_body+=", \"sharedDirectories\": [{\"hostPath\": \"$shared_dir\", \"readOnly\": false}]"
    fi
    
    # Only include storage if it's provided
    if [[ -n "$storage" ]]; then
        json_body+=", \"storage\": \"$storage\""
    fi
    
    # Add recovery mode (always false)
    json_body+=", \"recoveryMode\": false}"

    # Execute the command and store the response
    local response
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        # Show response in debug mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -X POST \
          -H 'Content-Type: application/json' \
          -d "$json_body" \
          http://${api_host}:${api_port}/lume/vms/$vm_name/run)
        echo "$response"
    else
        # Run silently in normal mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -s \
          -X POST \
          -H 'Content-Type: application/json' \
          -d "$json_body" \
          http://${api_host}:${api_port}/lume/vms/$vm_name/run)
    fi
}

# Delete a VM using curl
lume_delete() {
    local vm_name="$1"
    local storage="$2"
    
    local api_host="${LUME_API_HOST:-host.docker.internal}"
    local api_port="${LUME_API_PORT:-7777}"
    
    # URL encode the storage path for the query parameter
    # Replace special characters with their URL encoded equivalents
    local encoded_storage=$(echo "$storage" | sed 's/\//%2F/g' | sed 's/ /%20/g' | sed 's/:/%3A/g')
    
    # Construct API URL with encoded storage parameter
    local api_url="http://${api_host}:${api_port}/lume/vms/${vm_name}?storage=${encoded_storage}"
    
    # Only log in debug mode
    if [[ "$LUMIER_DEBUG" == "1" ]]; then
        echo "Deleting VM $vm_name from storage $storage..."
    fi
    
    # Execute command and capture response
    local response
    if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then
        # Show output in debug mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -X DELETE \
          "$api_url")
        echo "$response"
    else
        # Run silently in normal mode
        response=$(curl --connect-timeout 6000 \
          --max-time 5000 \
          -s \
          -X DELETE \
          "$api_url")
    fi
}
```

--------------------------------------------------------------------------------
/libs/python/agent/benchmarks/utils.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Shared utilities for ScreenSpot-Pro benchmarking and interactive testing.
"""

import dotenv
dotenv.load_dotenv()

import asyncio
import base64
import os
import sys
import subprocess as sp
import statistics
from datetime import datetime
from io import BytesIO
from typing import List, Union, Tuple, Optional

from PIL import Image, ImageDraw
from tqdm import tqdm
import gc
import torch

# Add parent directory to path for imports
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from agent.agent import ComputerAgent
from models.base import ModelProtocol

def get_gpu_memory() -> List[int]:
    """
    Get GPU memory usage using nvidia-smi.
    
    Returns:
        List of free memory values in MB for each GPU
    """
    try:
        command = "nvidia-smi --query-gpu=memory.free --format=csv"
        memory_free_info = sp.check_output(command.split()).decode('ascii').split('\n')[:-1][1:]
        memory_free_values = [int(x.split()[0]) for i, x in enumerate(memory_free_info)]
        return memory_free_values
    except (sp.CalledProcessError, FileNotFoundError, IndexError):
        # Fallback to torch if nvidia-smi is not available
        if torch.cuda.is_available():
            device = torch.cuda.current_device()
            total = torch.cuda.get_device_properties(device).total_memory / 1024 / 1024
            reserved = torch.cuda.memory_reserved(device) / 1024 / 1024
            return [int(total - reserved)]
        return [0]


def get_vram_usage() -> dict:
    """
    Get current VRAM usage statistics.
    
    Returns:
        Dictionary with VRAM usage info (in MB)
    """
    if torch.cuda.is_available():
        device = torch.cuda.current_device()
        allocated = torch.cuda.memory_allocated(device) / 1024 / 1024  # Convert to MB
        reserved = torch.cuda.memory_reserved(device) / 1024 / 1024   # Convert to MB
        total = torch.cuda.get_device_properties(device).total_memory / 1024 / 1024
        return {
            'allocated_mb': allocated,
            'reserved_mb': reserved,
            'total_mb': total,
            'free_mb': total - reserved
        }
    else:
        return {
            'allocated_mb': 0.0,
            'reserved_mb': 0.0,
            'total_mb': 0.0,
            'free_mb': 0.0
        }


def get_available_models() -> List[Union[str, ModelProtocol]]:
    """
    Get list of available models for testing.
    
    Returns:
        List of model strings and model classes
    """
    local_provider = "huggingface-local/"  # Options: huggingface-local/ or mlx/
    
    # from models.gta1 import GTA1Model

    models = [
        # === ComputerAgent model strings ===
        "openai/computer-use-preview",
        "anthropic/claude-opus-4-20250514",
        # f"{local_provider}HelloKKMe/GTA1-7B",
        # f"{local_provider}HelloKKMe/GTA1-32B",
        "openai/computer-use-preview+openai/gpt-4o-mini",
        "anthropic/claude-opus-4-20250514+openai/gpt-4o-mini",
        
        # === Reference model classes ===
        # GTA1Model("HelloKKMe/GTA1-7B"),
        # GTA1Model("HelloKKMe/GTA1-32B"), 
    ]
    
    return models


def is_click_in_bbox(click_coords: Optional[Tuple[int, int]], bbox: List[int]) -> bool:
    """
    Check if click coordinates are within the bounding box.
    
    Args:
        click_coords: (x, y) coordinates or None
        bbox: [x1, y1, x2, y2] bounding box
        
    Returns:
        True if click is within bbox, False otherwise
    """
    if click_coords is None:
        return False
    
    x, y = click_coords
    x1, y1, x2, y2 = bbox
    
    return x1 <= x <= x2 and y1 <= y <= y2


def image_to_base64(image: Image.Image) -> str:
    """
    Convert PIL Image to base64 string.
    
    Args:
        image: PIL Image
        
    Returns:
        Base64 encoded image string
    """
    buffered = BytesIO()
    image.save(buffered, format="PNG")
    return base64.b64encode(buffered.getvalue()).decode()


class ModelWrapper:
    """
    Wrapper to provide unified interface for both ComputerAgent and custom models.
    """
    
    def __init__(self, model: Union[str, ModelProtocol]):
        self.model = model
        self.is_computer_agent = isinstance(model, str)
        self.agent: Optional[ComputerAgent] = None
        self.vram_usage_history: List[float] = []  # Track VRAM usage over time
        
        if self.is_computer_agent:
            self.model_name = str(model)
        else:
            self.model_name = f"{model.__class__.__name__}('{getattr(model, 'model_name', 'unknown')}')"
    
    async def load_model(self) -> None:
        """Load the model."""
        if self.is_computer_agent:
            self.agent = ComputerAgent(model=str(self.model))
        else:
            await self.model.load_model() # type: ignore
        
        # Record initial VRAM usage after loading
        vram_info = get_vram_usage()
        self.vram_usage_history.append(vram_info['allocated_mb'])
    
    async def unload_model(self) -> None:
        """Unload the model."""
        if not self.is_computer_agent:
            await self.model.unload_model() # type: ignore
        else:
            del self.agent
            self.agent = None
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
        
        # Record VRAM usage after unloading
        vram_info = get_vram_usage()
        self.vram_usage_history.append(vram_info['allocated_mb'])
    
    def get_vram_stats(self) -> dict:
        """Get VRAM usage statistics for this model."""
        if not self.vram_usage_history:
            return {'max_mb': 0.0, 'avg_mb': 0.0}
        
        return {
            'max_mb': max(self.vram_usage_history),
            'avg_mb': sum(self.vram_usage_history) / len(self.vram_usage_history)
        }

    
    async def predict_click(self, image: Image.Image, instruction: str) -> Optional[Tuple[int, int]]:
        """Predict click coordinates."""
        # Record VRAM usage before prediction
        vram_info = get_vram_usage()
        self.vram_usage_history.append(vram_info['allocated_mb'])
        
        if self.is_computer_agent:
            if self.agent is None:
                await self.load_model()
            
            if self.agent is not None:
                image_b64 = image_to_base64(image)
                result = await self.agent.predict_click(instruction=instruction, image_b64=image_b64)
                
                # Record VRAM usage after prediction
                vram_info = get_vram_usage()
                self.vram_usage_history.append(vram_info['allocated_mb'])
                
                return result
            return None
        else:
            result = await self.model.predict_click(image, instruction) # type: ignore
            
            # Record VRAM usage after prediction
            vram_info = get_vram_usage()
            self.vram_usage_history.append(vram_info['allocated_mb'])
            
            return result


def save_results_to_markdown(all_results: List[dict],output_file: str = "screenspot_pro_results.md", title: str = "ScreenSpot-Pro Benchmark Results") -> None:
    """
    Save evaluation results to a markdown table.
    
    Args:
        all_results: List of evaluation results for each model
        output_file: Output markdown file path
    """
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(f"# {title}\n\n")
        f.write(f"**Evaluation Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
        
        # Summary table
        f.write("## Summary\n\n")
        f.write("| Model | Total Samples | Correct | Errors | Accuracy | Error Rate | Avg Time (s) | Median Time (s) | Time Range (s) | VRAM Max (GB) | VRAM Avg (GB) |\n")
        f.write("|-------|---------------|---------|--------|----------|------------|--------------|-----------------|----------------|---------------|---------------|\n")
        
        for result in all_results:
            model_name = result['model_name']
            total = result['total_samples']
            correct = result['correct_predictions']
            errors = result['failed_predictions']
            accuracy = result['accuracy'] * 100
            error_rate = result['failure_rate'] * 100
            avg_time = result.get('avg_prediction_time', 0.0)
            median_time = result.get('median_prediction_time', 0.0)
            min_time = result.get('min_prediction_time', 0.0)
            max_time = result.get('max_prediction_time', 0.0)
            time_range = f"{min_time:.2f} - {max_time:.2f}"
            vram_max = result.get('vram_max_mb', 0.0) / 1024
            vram_avg = result.get('vram_avg_mb', 0.0) / 1024
            
            f.write(f"| {model_name} | {total} | {correct} | {errors} | {accuracy:.2f}% | {error_rate:.2f}% | {avg_time:.2f} | {median_time:.2f} | {time_range} | {vram_max:.1f} | {vram_avg:.1f} |\n")
        
        # Detailed results for each model
        for result in all_results:
            f.write(f"\n## {result['model_name']} - Detailed Results\n\n")
            f.write("| Sample Index | Instruction | BBox | Predicted | Correct | Error | Time (s) |\n")
            f.write("|-----------|-------------|------|-----------|---------|-------|----------|\n")
            
            for sample_result in result['results'][:10]:  # Show first 10 samples
                sample_idx = sample_result['sample_idx']
                instruction = sample_result['instruction'][:50] + "..." if len(sample_result['instruction']) > 50 else sample_result['instruction']
                bbox = str(sample_result['bbox'])
                predicted = str(sample_result['predicted_coords']) if sample_result['predicted_coords'] else "None"
                correct = "PASS" if sample_result['is_correct'] else "FAIL"
                error = "YES" if sample_result['failed'] else "NO"
                pred_time = sample_result.get('prediction_time', 0.0)
                
                f.write(f"| {sample_idx} | {instruction} | {bbox} | {predicted} | {correct} | {error} | {pred_time:.2f} |\n")
            
            if len(result['results']) > 10:
                f.write(f"\n*Showing first 10 of {len(result['results'])} samples*\n")
    
    print(f"\nResults saved to: {output_file}")


def save_visualizations(all_results: List[dict], samples, output_dir: str = "output") -> None:
    """
    Save visualizations of predicted coordinates vs bboxes to an output folder.
    
    Args:
        all_results: List of evaluation results for each model
        samples: List of sample dicts with image, bbox, instruction keys
        output_dir: Output directory path
    """
    os.makedirs(output_dir, exist_ok=True)
    
    for result in all_results:
        model_name = result['model_name'].replace('/', '_').replace('\\', '_')
        model_dir = os.path.join(output_dir, model_name)
        os.makedirs(model_dir, exist_ok=True)
        
        print(f"Saving visualizations for {result['model_name']}...")
        
        # Save first 10 samples for visualization
        for i, sample_result in enumerate(tqdm(result['results'][:10], desc=f"Saving {model_name} visualizations")):
            # Get sample data using index
            sample_idx = sample_result['sample_idx']
            
            if sample_idx < len(samples):
                sample = samples[sample_idx]
                image = sample['image'].copy()  # Make a copy to avoid modifying original
            else:
                print(f"Warning: Could not find sample at index {sample_idx}")
                continue
            
            bbox = sample_result['bbox']
            predicted_coords = sample_result['predicted_coords']
            is_correct = sample_result['is_correct']
            
            # Draw on image
            draw = ImageDraw.Draw(image)
            
            # Draw bounding box (ground truth) in green
            x1, y1, x2, y2 = bbox
            draw.rectangle([x1, y1, x2, y2], outline="green", width=3)
            draw.text((x1, y1-20), "Ground Truth", fill="green")
            
            # Draw predicted click in red or blue
            if predicted_coords is not None:
                px, py = predicted_coords
                color = "blue" if is_correct else "red"
                # Draw crosshair
                crosshair_size = 15
                draw.line([(px-crosshair_size, py), (px+crosshair_size, py)], fill=color, width=3)
                draw.line([(px, py-crosshair_size), (px, py+crosshair_size)], fill=color, width=3)
                draw.text((px+10, py-20), f"Predicted ({px},{py})", fill=color)
            
            # Add status text
            status = "CORRECT" if is_correct else "INCORRECT"
            status_color = "blue" if is_correct else "red"
            draw.text((10, 10), f"Status: {status}", fill=status_color)
            draw.text((10, 30), f"Instruction: {sample_result['instruction'][:50]}...", fill="black")
            
            # Save image
            filename = f"sample_{i+1:02d}_idx{sample_idx}_{status.lower()}.png"
            filepath = os.path.join(model_dir, filename)
            image.save(filepath)
        
        print(f"Visualizations saved to: {model_dir}")


def save_prediction_visualization(image: Image.Image, instruction: str, predictions: List[dict], 
                                output_file: str = "interactive_prediction.png") -> None:
    """
    Save visualization of multiple model predictions on a single image.
    
    Args:
        image: PIL Image to visualize
        instruction: Instruction text
        predictions: List of prediction dicts with keys: model_name, coords, error
        output_file: Output file path
    """
    # Create a copy of the image
    vis_image = image.copy()
    draw = ImageDraw.Draw(vis_image)
    
    # Colors for different models
    colors = ["red", "blue", "orange", "purple", "brown", "pink", "gray", "olive"]
    
    # Draw predictions
    for i, pred in enumerate(predictions):
        color = colors[i % len(colors)]
        model_name = pred['model_name']
        coords = pred.get('coords')
        error = pred.get('error')
        
        if coords is not None:
            px, py = coords
            # Draw crosshair
            crosshair_size = 20
            draw.line([(px-crosshair_size, py), (px+crosshair_size, py)], fill=color, width=4)
            draw.line([(px, py-crosshair_size), (px, py+crosshair_size)], fill=color, width=4)
            # Draw model name
            draw.text((px+15, py+15), f"{model_name}: ({px},{py})", fill=color)
        else:
            # Draw error text
            draw.text((10, 50 + i*20), f"{model_name}: ERROR - {error}", fill=color)
    
    # Add instruction at the top
    draw.text((10, 10), f"Instruction: {instruction}", fill="black")
    
    # Save image
    vis_image.save(output_file)
    print(f"Prediction visualization saved to: {output_file}")


def take_screenshot() -> Image.Image:
    """
    Take a screenshot of the current screen.
    
    Returns:
        PIL Image of the screenshot
    """
    try:
        import pyautogui
        screenshot = pyautogui.screenshot()
        return screenshot
    except ImportError:
        print("pyautogui not installed. Please install it with: pip install pyautogui")
        raise
    except Exception as e:
        print(f"Error taking screenshot: {e}")
        raise


```

--------------------------------------------------------------------------------
/libs/python/agent/agent/callbacks/trajectory_saver.py:
--------------------------------------------------------------------------------

```python
"""
Trajectory saving callback handler for ComputerAgent.
"""

import os
import json
import uuid
from datetime import datetime
import base64
from pathlib import Path
from typing import List, Dict, Any, Optional, Union, override
from PIL import Image, ImageDraw
import io
from copy import deepcopy

from .base import AsyncCallbackHandler

def sanitize_image_urls(data: Any) -> Any:
    """
    Recursively search for 'image_url' keys and set their values to '[omitted]'.
    
    Args:
        data: Any data structure (dict, list, or primitive type)
        
    Returns:
        A deep copy of the data with all 'image_url' values replaced with '[omitted]'
    """
    if isinstance(data, dict):
        # Create a copy of the dictionary
        sanitized = {}
        for key, value in data.items():
            if key == "image_url":
                sanitized[key] = "[omitted]"
            else:
                # Recursively sanitize the value
                sanitized[key] = sanitize_image_urls(value)
        return sanitized
    
    elif isinstance(data, list):
        # Recursively sanitize each item in the list
        return [sanitize_image_urls(item) for item in data]
    
    else:
        # For primitive types (str, int, bool, None, etc.), return as-is
        return data


def extract_computer_call_outputs(items: List[Dict[str, Any]], screenshot_dir: Optional[Path]) -> List[Dict[str, Any]]:
    """
    Save any base64-encoded screenshots from computer_call_output entries to files and
    replace their image_url with the saved file path when a call_id is present.

    Only operates if screenshot_dir is provided and exists; otherwise returns items unchanged.

    Args:
        items: List of message/result dicts potentially containing computer_call_output entries
        screenshot_dir: Directory to write screenshots into

    Returns:
        A new list with updated image_url fields when applicable.
    """
    if not items:
        return items
    if not screenshot_dir or not screenshot_dir.exists():
        return items

    updated: List[Dict[str, Any]] = []
    for item in items:
        # work on a shallow copy; deep copy nested 'output' if we modify it
        msg = dict(item)
        try:
            if msg.get("type") == "computer_call_output":
                call_id = msg.get("call_id")
                output = msg.get("output", {})
                image_url = output.get("image_url")
                if call_id and isinstance(image_url, str) and image_url.startswith("data:"):
                    # derive extension from MIME type e.g. data:image/png;base64,
                    try:
                        ext = image_url.split(";", 1)[0].split("/")[-1]
                        if not ext:
                            ext = "png"
                    except Exception:
                        ext = "png"
                    out_path = screenshot_dir / f"{call_id}.{ext}"
                    # write file if it doesn't exist
                    if not out_path.exists():
                        try:
                            b64_payload = image_url.split(",", 1)[1]
                            img_bytes = base64.b64decode(b64_payload)
                            out_path.parent.mkdir(parents=True, exist_ok=True)
                            with open(out_path, "wb") as f:
                                f.write(img_bytes)
                        except Exception:
                            # if anything fails, skip modifying this message
                            pass
                    # update image_url to file path
                    new_output = dict(output)
                    new_output["image_url"] = str(out_path)
                    msg["output"] = new_output
        except Exception:
            # do not block on malformed entries; keep original
            pass
        updated.append(msg)
    return updated

class TrajectorySaverCallback(AsyncCallbackHandler):
    """
    Callback handler that saves agent trajectories to disk.
    
    Saves each run as a separate trajectory with unique ID, and each turn
    within the trajectory gets its own folder with screenshots and responses.
    """
    
    def __init__(self, trajectory_dir: str, reset_on_run: bool = True, screenshot_dir: Optional[str] = None):
        """
        Initialize trajectory saver.
        
        Args:
            trajectory_dir: Base directory to save trajectories
            reset_on_run: If True, reset trajectory_id/turn/artifact on each run.
                         If False, continue using existing trajectory_id if set.
        """
        self.trajectory_dir = Path(trajectory_dir)
        self.trajectory_id: Optional[str] = None
        self.current_turn: int = 0
        self.current_artifact: int = 0
        self.model: Optional[str] = None
        self.total_usage: Dict[str, Any] = {}
        self.reset_on_run = reset_on_run
        # Optional directory to store extracted screenshots from metadata/new_items
        self.screenshot_dir: Optional[Path] = Path(screenshot_dir) if screenshot_dir else None
        
        # Ensure trajectory directory exists
        self.trajectory_dir.mkdir(parents=True, exist_ok=True)

    def _get_turn_dir(self) -> Path:
        """Get the directory for the current turn."""
        if not self.trajectory_id:
            raise ValueError("Trajectory not initialized - call _on_run_start first")
        
        # format: trajectory_id/turn_000
        turn_dir = self.trajectory_dir / self.trajectory_id / f"turn_{self.current_turn:03d}"
        turn_dir.mkdir(parents=True, exist_ok=True)
        return turn_dir

    def _save_artifact(self, name: str, artifact: Union[str, bytes, Dict[str, Any]]) -> None:
        """Save an artifact to the current turn directory."""
        turn_dir = self._get_turn_dir()
        if isinstance(artifact, bytes):
            # format: turn_000/0000_name.png
            artifact_filename = f"{self.current_artifact:04d}_{name}"
            artifact_path = turn_dir / f"{artifact_filename}.png"
            with open(artifact_path, "wb") as f:
                f.write(artifact)
        else:
            # format: turn_000/0000_name.json
            artifact_filename = f"{self.current_artifact:04d}_{name}"
            artifact_path = turn_dir / f"{artifact_filename}.json"
            # add created_at
            if isinstance(artifact, dict):
                artifact = artifact.copy()
                artifact["created_at"] = str(uuid.uuid1().time)
            with open(artifact_path, "w") as f:
                json.dump(sanitize_image_urls(artifact), f, indent=2)
        self.current_artifact += 1

    def _update_usage(self, usage: Dict[str, Any]) -> None:
        """Update total usage statistics."""
        def add_dicts(target: Dict[str, Any], source: Dict[str, Any]) -> None:
            for key, value in source.items():
                if isinstance(value, dict):
                    if key not in target:
                        target[key] = {}
                    add_dicts(target[key], value)
                else:
                    if key not in target:
                        target[key] = 0
                    target[key] += value
        add_dicts(self.total_usage, usage)
    
    @override
    async def on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None:
        """Initialize trajectory tracking for a new run."""
        model = kwargs.get("model", "unknown")
        
        # Only reset trajectory state if reset_on_run is True or no trajectory exists
        if self.reset_on_run or not self.trajectory_id:
            model_name_short = model.split("+")[-1].split("/")[-1].lower()[:16]
            if "+" in model:
                model_name_short = model.split("+")[0].lower()[:4] + "_" + model_name_short
            # strip non-alphanumeric characters from model_name_short
            model_name_short = ''.join(c for c in model_name_short if c.isalnum() or c == '_')

            # id format: yyyy-mm-dd_model_hhmmss_uuid[:4]
            now = datetime.now()
            self.trajectory_id = f"{now.strftime('%Y-%m-%d')}_{model_name_short}_{now.strftime('%H%M%S')}_{str(uuid.uuid4())[:4]}"
            self.current_turn = 0
            self.current_artifact = 0
            self.model = model
            self.total_usage = {}
            
            # Create trajectory directory
            trajectory_path = self.trajectory_dir / self.trajectory_id
            trajectory_path.mkdir(parents=True, exist_ok=True)
            
            # Save trajectory metadata (optionally extract screenshots to screenshot_dir)
            kwargs_to_save = kwargs.copy()
            try:
                if "messages" in kwargs_to_save:
                    kwargs_to_save["messages"] = extract_computer_call_outputs(
                        kwargs_to_save["messages"], self.screenshot_dir
                    )
            except Exception:
                # If extraction fails, fall back to original messages
                pass
            metadata = {
                "trajectory_id": self.trajectory_id,
                "created_at": str(uuid.uuid1().time),
                "status": "running",
                "kwargs": kwargs_to_save,
            }
            
            with open(trajectory_path / "metadata.json", "w") as f:
                json.dump(metadata, f, indent=2)
        else:
            # Continue with existing trajectory - just update model if needed
            self.model = model

    @override
    async def on_run_end(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> None:
        """Finalize run tracking by updating metadata with completion status, usage, and new items."""
        if not self.trajectory_id:
            return
        
        # Update metadata with completion status, total usage, and new items
        trajectory_path = self.trajectory_dir / self.trajectory_id
        metadata_path = trajectory_path / "metadata.json"
        
        # Read existing metadata
        if metadata_path.exists():
            with open(metadata_path, "r") as f:
                metadata = json.load(f)
        else:
            metadata = {}
        
        # Update metadata with completion info
        # Optionally extract screenshots from new_items before persisting
        new_items_to_save = new_items
        try:
            new_items_to_save = extract_computer_call_outputs(new_items, self.screenshot_dir)
        except Exception:
            pass

        metadata.update({
            "status": "completed",
            "completed_at": str(uuid.uuid1().time),
            "total_usage": self.total_usage,
            "new_items": new_items_to_save,
            "total_turns": self.current_turn
        })
        
        # Save updated metadata
        with open(metadata_path, "w") as f:
            json.dump(metadata, f, indent=2)
    
    @override 
    async def on_api_start(self, kwargs: Dict[str, Any]) -> None:
        if not self.trajectory_id:
            return
        
        self._save_artifact("api_start", { "kwargs": kwargs })
    
    @override
    async def on_api_end(self, kwargs: Dict[str, Any], result: Any) -> None:
        """Save API call result."""
        if not self.trajectory_id:
            return
        
        self._save_artifact("api_result", { "kwargs": kwargs, "result": result })

    @override
    async def on_screenshot(self, screenshot: Union[str, bytes], name: str = "screenshot") -> None:
        """Save a screenshot."""
        if isinstance(screenshot, str):
            screenshot = base64.b64decode(screenshot)
        self._save_artifact(name, screenshot)

    @override
    async def on_usage(self, usage: Dict[str, Any]) -> None:
        """Called when usage information is received."""
        self._update_usage(usage)

    @override
    async def on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None:
        """Save responses to the current turn directory and update usage statistics."""
        if not self.trajectory_id:
            return
        
        # Save responses
        turn_dir = self._get_turn_dir()
        response_data = {
            "timestamp": str(uuid.uuid1().time),
            "model": self.model,
            "kwargs": kwargs,
            "response": responses
        }
        
        self._save_artifact("agent_response", response_data)
        
        # Increment turn counter
        self.current_turn += 1

    def _draw_crosshair_on_image(self, image_bytes: bytes, x: int, y: int) -> bytes:
        """
        Draw a red dot and crosshair at the specified coordinates on the image.
        
        Args:
            image_bytes: The original image as bytes
            x: X coordinate for the crosshair
            y: Y coordinate for the crosshair
            
        Returns:
            Modified image as bytes with red dot and crosshair
        """
        # Open the image
        image = Image.open(io.BytesIO(image_bytes))
        draw = ImageDraw.Draw(image)
        
        # Draw crosshair lines (red, 2px thick)
        crosshair_size = 20
        line_width = 2
        color = "red"
        
        # Horizontal line
        draw.line([(x - crosshair_size, y), (x + crosshair_size, y)], fill=color, width=line_width)
        # Vertical line
        draw.line([(x, y - crosshair_size), (x, y + crosshair_size)], fill=color, width=line_width)
        
        # Draw center dot (filled circle)
        dot_radius = 3
        draw.ellipse([(x - dot_radius, y - dot_radius), (x + dot_radius, y + dot_radius)], fill=color)
        
        # Convert back to bytes
        output = io.BytesIO()
        image.save(output, format='PNG')
        return output.getvalue()

    @override
    async def on_computer_call_end(self, item: Dict[str, Any], result: List[Dict[str, Any]]) -> None:
        """
        Called when a computer call has completed.
        Saves screenshots and computer call output.
        """
        if not self.trajectory_id:
            return
        
        self._save_artifact("computer_call_result", { "item": item, "result": result })
        
        # Check if action has x/y coordinates and there's a screenshot in the result
        action = item.get("action", {})
        if "x" in action and "y" in action:
            # Look for screenshot in the result
            for result_item in result:
                if (result_item.get("type") == "computer_call_output" and 
                    result_item.get("output", {}).get("type") == "input_image"):
                    
                    image_url = result_item["output"]["image_url"]
                
                    # Extract base64 image data
                    if image_url.startswith("data:image/"):
                        # Format: data:image/png;base64,<base64_data>
                        base64_data = image_url.split(",", 1)[1]
                    else:
                        # Assume it's just base64 data
                        base64_data = image_url
                    
                    try:
                        # Decode the image
                        image_bytes = base64.b64decode(base64_data)
                        
                        # Draw crosshair at the action coordinates
                        annotated_image = self._draw_crosshair_on_image(
                            image_bytes, 
                            int(action["x"]), 
                            int(action["y"])
                        )
                        
                        # Save as screenshot_action
                        self._save_artifact("screenshot_action", annotated_image)
                        
                    except Exception as e:
                        # If annotation fails, just log and continue
                        print(f"Failed to annotate screenshot: {e}")
                    
                    break  # Only process the first screenshot found

        # Increment turn counter
        self.current_turn += 1
```

--------------------------------------------------------------------------------
/tests/test_files.py:
--------------------------------------------------------------------------------

```python
"""
File System Interface Tests
Tests for the file system methods of the Computer interface (macOS).
Required environment variables:
- CUA_API_KEY: API key for Cua cloud provider
- CUA_CONTAINER_NAME: Name of the container to use
"""

import os
import asyncio
import pytest
from pathlib import Path
import sys
import traceback

# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv

load_dotenv(env_file)

# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
    if path and path not in sys.path:
        sys.path.insert(0, path)  # Insert at beginning to prioritize
        print(f"Added to sys.path: {path}")

from computer import Computer, VMProviderType

@pytest.fixture(scope="session")
async def computer():
    """Shared Computer instance for all test cases."""
    # Create a remote Linux computer with Cua
    computer = Computer(
        os_type="linux",
        api_key=os.getenv("CUA_API_KEY"),
        name=str(os.getenv("CUA_CONTAINER_NAME")),
        provider_type=VMProviderType.CLOUD,
    )
    
    # Create a local macOS computer with Cua
    # computer = Computer()
    
    # Connect to host computer
    # computer = Computer(use_host_computer_server=True)
    
    try:
        await computer.run()
        yield computer
    finally:
        await computer.disconnect()

@pytest.mark.asyncio(loop_scope="session")
async def test_file_exists(computer):
    tmp_path = "test_file_exists.txt"
    # Ensure file does not exist
    if await computer.interface.file_exists(tmp_path):
        await computer.interface.delete_file(tmp_path)
    exists = await computer.interface.file_exists(tmp_path)
    assert exists is False, f"File {tmp_path} should not exist"
    # Create file and check again
    await computer.interface.write_text(tmp_path, "hello")
    exists = await computer.interface.file_exists(tmp_path)
    assert exists is True, f"File {tmp_path} should exist"
    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_directory_exists(computer):
    tmp_dir = "test_directory_exists"
    if await computer.interface.directory_exists(tmp_dir):
        # Remove all files in directory before removing directory
        files = await computer.interface.list_dir(tmp_dir)
        for fname in files:
            await computer.interface.delete_file(f"{tmp_dir}/{fname}")
        # Remove the directory itself
        await computer.interface.delete_dir(tmp_dir)
    exists = await computer.interface.directory_exists(tmp_dir)
    assert exists is False, f"Directory {tmp_dir} should not exist"
    await computer.interface.create_dir(tmp_dir)
    exists = await computer.interface.directory_exists(tmp_dir)
    assert exists is True, f"Directory {tmp_dir} should exist"
    # Cleanup: remove files and directory
    files = await computer.interface.list_dir(tmp_dir)
    for fname in files:
        await computer.interface.delete_file(f"{tmp_dir}/{fname}")
    await computer.interface.delete_dir(tmp_dir)


@pytest.mark.asyncio(loop_scope="session")
async def test_list_dir(computer):
    tmp_dir = "test_list_dir"
    if not await computer.interface.directory_exists(tmp_dir):
        await computer.interface.create_dir(tmp_dir)
    files = ["foo.txt", "bar.txt"]
    for fname in files:
        await computer.interface.write_text(f"{tmp_dir}/{fname}", "hi")
    result = await computer.interface.list_dir(tmp_dir)
    assert set(result) >= set(files), f"Directory {tmp_dir} should contain files {files}"
    for fname in files:
        await computer.interface.delete_file(f"{tmp_dir}/{fname}")
    await computer.interface.delete_dir(tmp_dir)


@pytest.mark.asyncio(loop_scope="session")
async def test_read_write_text(computer):
    tmp_path = "test_rw_text.txt"
    content = "sample text"
    await computer.interface.write_text(tmp_path, content)
    read = await computer.interface.read_text(tmp_path)
    assert read == content, "File content should match"
    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_delete_file(computer):
    tmp_path = "test_delete_file.txt"
    await computer.interface.write_text(tmp_path, "bye")
    exists = await computer.interface.file_exists(tmp_path)
    assert exists is True, "File should exist"
    await computer.interface.delete_file(tmp_path)
    exists = await computer.interface.file_exists(tmp_path)
    assert exists is False, "File should not exist"


@pytest.mark.asyncio(loop_scope="session")
async def test_create_dir(computer):
    tmp_dir = "test_create_dir"
    if await computer.interface.directory_exists(tmp_dir):
        await computer.interface.delete_dir(tmp_dir)
    await computer.interface.create_dir(tmp_dir)
    exists = await computer.interface.directory_exists(tmp_dir)
    assert exists is True, "Directory should exist"
    await computer.interface.delete_dir(tmp_dir)


@pytest.mark.asyncio(loop_scope="session")
async def test_read_bytes_basic(computer):
    """Test basic read_bytes functionality."""
    tmp_path = "test_read_bytes.bin"
    test_data = b"Hello, World! This is binary data \x00\x01\x02\x03"
    
    # Write binary data using write_text (assuming it handles bytes)
    await computer.interface.write_text(tmp_path, test_data.decode('latin-1'))
    
    # Read all bytes
    read_data = await computer.interface.read_bytes(tmp_path)
    assert read_data == test_data, "Binary data should match"
    
    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_read_bytes_with_offset_and_length(computer):
    """Test read_bytes with offset and length parameters."""
    tmp_path = "test_read_bytes_offset.bin"
    test_data = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    
    # Write test data
    await computer.interface.write_text(tmp_path, test_data.decode('latin-1'))
    
    # Test reading with offset only
    read_data = await computer.interface.read_bytes(tmp_path, offset=5)
    expected = test_data[5:]
    assert read_data == expected, f"Data from offset 5 should match. Got: {read_data}, Expected: {expected}"
    
    # Test reading with offset and length
    read_data = await computer.interface.read_bytes(tmp_path, offset=10, length=5)
    expected = test_data[10:15]
    assert read_data == expected, f"Data from offset 10, length 5 should match. Got: {read_data}, Expected: {expected}"
    
    # Test reading from beginning with length
    read_data = await computer.interface.read_bytes(tmp_path, offset=0, length=10)
    expected = test_data[:10]
    assert read_data == expected, f"Data from beginning, length 10 should match. Got: {read_data}, Expected: {expected}"
    
    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_get_file_size(computer):
    """Test get_file_size functionality."""
    tmp_path = "test_file_size.txt"
    test_content = "A" * 1000  # 1000 bytes
    
    await computer.interface.write_text(tmp_path, test_content)
    
    file_size = await computer.interface.get_file_size(tmp_path)
    assert file_size == 1000, f"File size should be 1000 bytes, got {file_size}"
    
    await computer.interface.delete_file(tmp_path)


@pytest.mark.asyncio(loop_scope="session")
async def test_read_large_file(computer):
    """Test reading a file larger than 10MB to verify chunked reading."""
    tmp_path = "test_large_file.bin"
    
    # Create a file larger than 10MB (10 * 1024 * 1024 = 10,485,760 bytes)
    total_size = 12 * 1024 * 1024  # 12MB
    
    print(f"Creating large file of {total_size} bytes ({total_size / (1024*1024):.1f}MB)...")
    
    # Create large file content (this will test the chunked writing functionality)
    large_content = b"X" * total_size
    
    # Write the large file using write_bytes (will automatically use chunked writing)
    await computer.interface.write_bytes(tmp_path, large_content)
    
    # Verify file size
    file_size = await computer.interface.get_file_size(tmp_path)
    assert file_size == total_size, f"Large file size should be {total_size} bytes, got {file_size}"
    
    print(f"Large file created successfully: {file_size} bytes")
    
    # Test reading the entire large file (should use chunked reading)
    print("Reading large file...")
    read_data = await computer.interface.read_bytes(tmp_path)
    assert len(read_data) == total_size, f"Read data size should match file size. Got {len(read_data)}, expected {total_size}"
    
    # Verify content (should be all 'X' characters)
    expected_data = b"X" * total_size
    assert read_data == expected_data, "Large file content should be all 'X' characters"
    
    print("Large file read successfully!")
    
    # Test reading with offset and length on large file
    offset = 5 * 1024 * 1024  # 5MB offset
    length = 2 * 1024 * 1024  # 2MB length
    read_data = await computer.interface.read_bytes(tmp_path, offset=offset, length=length)
    assert len(read_data) == length, f"Partial read size should be {length}, got {len(read_data)}"
    assert read_data == b"X" * length, "Partial read content should be all 'X' characters"
    
    print("Large file partial read successful!")
    
    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Large file test completed successfully!")

@pytest.mark.asyncio(loop_scope="session")
async def test_read_write_text_with_encoding(computer):
    """Test reading and writing text files with different encodings."""
    print("Testing text file operations with different encodings...")
    
    tmp_path = "test_encoding.txt"
    
    # Test UTF-8 encoding (default)
    utf8_content = "Hello, 世界! 🌍 Ñoño café"
    await computer.interface.write_text(tmp_path, utf8_content, encoding='utf-8')
    read_utf8 = await computer.interface.read_text(tmp_path, encoding='utf-8')
    assert read_utf8 == utf8_content, "UTF-8 content should match"
    
    # Test ASCII encoding
    ascii_content = "Hello, World! Simple ASCII text."
    await computer.interface.write_text(tmp_path, ascii_content, encoding='ascii')
    read_ascii = await computer.interface.read_text(tmp_path, encoding='ascii')
    assert read_ascii == ascii_content, "ASCII content should match"
    
    # Test Latin-1 encoding
    latin1_content = "Café, naïve, résumé"
    await computer.interface.write_text(tmp_path, latin1_content, encoding='latin-1')
    read_latin1 = await computer.interface.read_text(tmp_path, encoding='latin-1')
    assert read_latin1 == latin1_content, "Latin-1 content should match"
    
    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Text encoding test completed successfully!")

@pytest.mark.asyncio(loop_scope="session")
async def test_write_text_append_mode(computer):
    """Test appending text to files."""
    print("Testing text file append mode...")
    
    tmp_path = "test_append.txt"
    
    # Write initial content
    initial_content = "First line\n"
    await computer.interface.write_text(tmp_path, initial_content)
    
    # Append more content
    append_content = "Second line\n"
    await computer.interface.write_text(tmp_path, append_content, append=True)
    
    # Read and verify
    final_content = await computer.interface.read_text(tmp_path)
    expected_content = initial_content + append_content
    assert final_content == expected_content, f"Expected '{expected_content}', got '{final_content}'"
    
    # Append one more line
    third_content = "Third line\n"
    await computer.interface.write_text(tmp_path, third_content, append=True)
    
    # Read and verify final result
    final_content = await computer.interface.read_text(tmp_path)
    expected_content = initial_content + append_content + third_content
    assert final_content == expected_content, f"Expected '{expected_content}', got '{final_content}'"
    
    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Text append test completed successfully!")

@pytest.mark.asyncio(loop_scope="session")
async def test_large_text_file(computer):
    """Test reading and writing large text files (>5MB) to verify chunked operations."""
    print("Testing large text file operations...")
    
    tmp_path = "test_large_text.txt"
    
    # Create a large text content (approximately 6MB)
    # Each line is about 100 characters, so 60,000 lines ≈ 6MB
    line_template = "This is line {:06d} with some additional text to make it longer and reach about 100 chars.\n"
    large_content = ""
    num_lines = 60000
    
    print(f"Generating large text content with {num_lines} lines...")
    for i in range(num_lines):
        large_content += line_template.format(i)
    
    content_size_mb = len(large_content.encode('utf-8')) / (1024 * 1024)
    print(f"Generated text content size: {content_size_mb:.2f} MB")
    
    # Write the large text file
    print("Writing large text file...")
    await computer.interface.write_text(tmp_path, large_content)
    
    # Read the entire file back
    print("Reading large text file...")
    read_content = await computer.interface.read_text(tmp_path)
    
    # Verify content matches
    assert read_content == large_content, "Large text file content should match exactly"
    
    # Test partial reading by reading as bytes and decoding specific portions
    print("Testing partial text reading...")
    
    # Read first 1000 characters worth of bytes
    first_1000_chars = large_content[:1000]
    first_1000_bytes = first_1000_chars.encode('utf-8')
    read_bytes = await computer.interface.read_bytes(tmp_path, offset=0, length=len(first_1000_bytes))
    decoded_partial = read_bytes.decode('utf-8')
    assert decoded_partial == first_1000_chars, "Partial text reading should match"
    
    # Test appending to large file
    print("Testing append to large text file...")
    append_text = "\n--- APPENDED CONTENT ---\nThis content was appended to the large file.\n"
    await computer.interface.write_text(tmp_path, append_text, append=True)
    
    # Read and verify appended content
    final_content = await computer.interface.read_text(tmp_path)
    expected_final = large_content + append_text
    assert final_content == expected_final, "Appended large text file should match"
    
    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Large text file test completed successfully!")

@pytest.mark.asyncio(loop_scope="session")
async def test_text_file_edge_cases(computer):
    """Test edge cases for text file operations."""
    print("Testing text file edge cases...")
    
    tmp_path = "test_edge_cases.txt"
    
    # Test empty file
    empty_content = ""
    await computer.interface.write_text(tmp_path, empty_content)
    read_empty = await computer.interface.read_text(tmp_path)
    assert read_empty == empty_content, "Empty file should return empty string"
    
    # Test file with only whitespace
    whitespace_content = "   \n\t\r\n   \n"
    await computer.interface.write_text(tmp_path, whitespace_content)
    read_whitespace = await computer.interface.read_text(tmp_path)
    assert read_whitespace == whitespace_content, "Whitespace content should be preserved"
    
    # Test file with special characters and newlines
    special_content = "Line 1\nLine 2\r\nLine 3\tTabbed\nSpecial: !@#$%^&*()\n"
    await computer.interface.write_text(tmp_path, special_content)
    read_special = await computer.interface.read_text(tmp_path)
    assert read_special == special_content, "Special characters should be preserved"
    
    # Test very long single line (no newlines)
    long_line = "A" * 10000  # 10KB single line
    await computer.interface.write_text(tmp_path, long_line)
    read_long_line = await computer.interface.read_text(tmp_path)
    assert read_long_line == long_line, "Long single line should be preserved"
    
    # Clean up
    await computer.interface.delete_file(tmp_path)
    print("Text file edge cases test completed successfully!")

if __name__ == "__main__":
    # Run tests directly
    pytest.main([__file__, "-v"])

```

--------------------------------------------------------------------------------
/docs/src/app/(home)/[[...slug]]/page.tsx:
--------------------------------------------------------------------------------

```typescript
import { getApiVersions, source } from '@/lib/source';
import { getMDXComponents } from '@/mdx-components';
import { buttonVariants } from 'fumadocs-ui/components/ui/button';
import {
  Popover,
  PopoverContent,
  PopoverTrigger,
} from 'fumadocs-ui/components/ui/popover';
import { createRelativeLink } from 'fumadocs-ui/mdx';
import {
  DocsBody,
  DocsDescription,
  DocsPage,
  DocsTitle,
} from 'fumadocs-ui/page';
import { cn } from 'fumadocs-ui/utils/cn';
import { ChevronDown, CodeXml, ExternalLink } from 'lucide-react';
import type { Metadata } from 'next';
import Link from 'next/link';
import { notFound, redirect } from 'next/navigation';

export default async function Page(props: {
  params: Promise<{ slug?: string[] }>;
}) {
  const params = await props.params;
  const slug = params.slug || [];
  const page = source.getPage(slug);
  if (!page) notFound(); //redirect('/docs');

  // Detect if this is an API reference page: /api/[section] or /api/[section]/[version]
  let apiSection: string | null = null;
  let apiVersionSlug: string[] = [];
  if (slug[0] === 'api' && slug.length >= 2) {
    apiSection = slug[1];
    if (slug.length > 2) {
      apiVersionSlug = slug.slice(2);
    }
  }

  let versionItems: { label: string; slug: string[] }[] = [];
  if (apiSection) {
    versionItems = await getApiVersions(apiSection);
  }

  const macos = page.data.macos;
  const windows = page.data.windows;
  const linux = page.data.linux;
  const pypi = page.data.pypi;
  const npm = page.data.npm;
  const github = page.data.github;

  const MDXContent = page.data.body;

  // Platform icons component
  const PlatformIcons = () => {
    const hasAnyPlatform = macos || windows || linux;
    if (!hasAnyPlatform && !pypi) return null;

    return (
      <div className="flex flex-col gap-2">
        {hasAnyPlatform && (
          <div className="flex flex-row gap-2 items-left dark:text-neutral-400">
            {windows && (
              <svg
                xmlns="http://www.w3.org/2000/svg"
                fill="currentColor"
                className="h-5"
                viewBox="0 0 448 512">
                <title>Windows</title>
                <path d="M0 93.7l183.6-25.3v177.4H0V93.7zm0 324.6l183.6 25.3V268.4H0v149.9zm203.8 28L448 480V268.4H203.8v177.9zm0-380.6v180.1H448V32L203.8 65.7z" />
              </svg>
            )}
            {macos && (
              <svg
                xmlns="http://www.w3.org/2000/svg"
                fill="currentColor"
                className="h-5"
                viewBox="0 0 384 512">
                <title>macOS</title>
                <path d="M318.7 268.7c-.2-36.7 16.4-64.4 50-84.8-18.8-26.9-47.2-41.7-84.7-44.6-35.5-2.8-74.3 20.7-88.5 20.7-15 0-49.4-19.7-76.4-19.7C63.3 141.2 4 184.8 4 273.5q0 39.3 14.4 81.2c12.8 36.7 59 126.7 107.2 125.2 25.2-.6 43-17.9 75.8-17.9 31.8 0 48.3 17.9 76.4 17.9 48.6-.7 90.4-82.5 102.6-119.3-65.2-30.7-61.7-90-61.7-91.9zm-56.6-164.2c27.3-32.4 24.8-61.9 24-72.5-24.1 1.4-52 16.4-67.9 34.9-17.5 19.8-27.8 44.3-25.6 71.9 26.1 2 49.9-11.4 69.5-34.3z" />
              </svg>
            )}
            {linux && (
              <svg
                xmlns="http://www.w3.org/2000/svg"
                fill="currentColor"
                className="h-5"
                viewBox="0 0 448 512">
                <title>Linux</title>
                <path d="M220.8 123.3c1 .5 1.8 1.7 3 1.7 1.1 0 2.8-.4 2.9-1.5 .2-1.4-1.9-2.3-3.2-2.9-1.7-.7-3.9-1-5.5-.1-.4 .2-.8 .7-.6 1.1 .3 1.3 2.3 1.1 3.4 1.7zm-21.9 1.7c1.2 0 2-1.2 3-1.7 1.1-.6 3.1-.4 3.5-1.6 .2-.4-.2-.9-.6-1.1-1.6-.9-3.8-.6-5.5 .1-1.3 .6-3.4 1.5-3.2 2.9 .1 1 1.8 1.5 2.8 1.4zM420 403.8c-3.6-4-5.3-11.6-7.2-19.7-1.8-8.1-3.9-16.8-10.5-22.4-1.3-1.1-2.6-2.1-4-2.9-1.3-.8-2.7-1.5-4.1-2 9.2-27.3 5.6-54.5-3.7-79.1-11.4-30.1-31.3-56.4-46.5-74.4-17.1-21.5-33.7-41.9-33.4-72C311.1 85.4 315.7 .1 234.8 0 132.4-.2 158 103.4 156.9 135.2c-1.7 23.4-6.4 41.8-22.5 64.7-18.9 22.5-45.5 58.8-58.1 96.7-6 17.9-8.8 36.1-6.2 53.3-6.5 5.8-11.4 14.7-16.6 20.2-4.2 4.3-10.3 5.9-17 8.3s-14 6-18.5 14.5c-2.1 3.9-2.8 8.1-2.8 12.4 0 3.9 .6 7.9 1.2 11.8 1.2 8.1 2.5 15.7 .8 20.8-5.2 14.4-5.9 24.4-2.2 31.7 3.8 7.3 11.4 10.5 20.1 12.3 17.3 3.6 40.8 2.7 59.3 12.5 19.8 10.4 39.9 14.1 55.9 10.4 11.6-2.6 21.1-9.6 25.9-20.2 12.5-.1 26.3-5.4 48.3-6.6 14.9-1.2 33.6 5.3 55.1 4.1 .6 2.3 1.4 4.6 2.5 6.7v.1c8.3 16.7 23.8 24.3 40.3 23 16.6-1.3 34.1-11 48.3-27.9 13.6-16.4 36-23.2 50.9-32.2 7.4-4.5 13.4-10.1 13.9-18.3 .4-8.2-4.4-17.3-15.5-29.7zM223.7 87.3c9.8-22.2 34.2-21.8 44-.4 6.5 14.2 3.6 30.9-4.3 40.4-1.6-.8-5.9-2.6-12.6-4.9 1.1-1.2 3.1-2.7 3.9-4.6 4.8-11.8-.2-27-9.1-27.3-7.3-.5-13.9 10.8-11.8 23-4.1-2-9.4-3.5-13-4.4-1-6.9-.3-14.6 2.9-21.8zM183 75.8c10.1 0 20.8 14.2 19.1 33.5-3.5 1-7.1 2.5-10.2 4.6 1.2-8.9-3.3-20.1-9.6-19.6-8.4 .7-9.8 21.2-1.8 28.1 1 .8 1.9-.2-5.9 5.5-15.6-14.6-10.5-52.1 8.4-52.1zm-13.6 60.7c6.2-4.6 13.6-10 14.1-10.5 4.7-4.4 13.5-14.2 27.9-14.2 7.1 0 15.6 2.3 25.9 8.9 6.3 4.1 11.3 4.4 22.6 9.3 8.4 3.5 13.7 9.7 10.5 18.2-2.6 7.1-11 14.4-22.7 18.1-11.1 3.6-19.8 16-38.2 14.9-3.9-.2-7-1-9.6-2.1-8-3.5-12.2-10.4-20-15-8.6-4.8-13.2-10.4-14.7-15.3-1.4-4.9 0-9 4.2-12.3zm3.3 334c-2.7 35.1-43.9 34.4-75.3 18-29.9-15.8-68.6-6.5-76.5-21.9-2.4-4.7-2.4-12.7 2.6-26.4v-.2c2.4-7.6 .6-16-.6-23.9-1.2-7.8-1.8-15 .9-20 3.5-6.7 8.5-9.1 14.8-11.3 10.3-3.7 11.8-3.4 19.6-9.9 5.5-5.7 9.5-12.9 14.3-18 5.1-5.5 10-8.1 17.7-6.9 8.1 1.2 15.1 6.8 21.9 16l19.6 35.6c9.5 19.9 43.1 48.4 41 68.9zm-1.4-25.9c-4.1-6.6-9.6-13.6-14.4-19.6 7.1 0 14.2-2.2 16.7-8.9 2.3-6.2 0-14.9-7.4-24.9-13.5-18.2-38.3-32.5-38.3-32.5-13.5-8.4-21.1-18.7-24.6-29.9s-3-23.3-.3-35.2c5.2-22.9 18.6-45.2 27.2-59.2 2.3-1.7 .8 3.2-8.7 20.8-8.5 16.1-24.4 53.3-2.6 82.4 .6-20.7 5.5-41.8 13.8-61.5 12-27.4 37.3-74.9 39.3-112.7 1.1 .8 4.6 3.2 6.2 4.1 4.6 2.7 8.1 6.7 12.6 10.3 12.4 10 28.5 9.2 42.4 1.2 6.2-3.5 11.2-7.5 15.9-9 9.9-3.1 17.8-8.6 22.3-15 7.7 30.4 25.7 74.3 37.2 95.7 6.1 11.4 18.3 35.5 23.6 64.6 3.3-.1 7 .4 10.9 1.4 13.8-35.7-11.7-74.2-23.3-84.9-4.7-4.6-4.9-6.6-2.6-6.5 12.6 11.2 29.2 33.7 35.2 59 2.8 11.6 3.3 23.7 .4 35.7 16.4 6.8 35.9 17.9 30.7 34.8-2.2-.1-3.2 0-4.2 0 3.2-10.1-3.9-17.6-22.8-26.1-19.6-8.6-36-8.6-38.3 12.5-12.1 4.2-18.3 14.7-21.4 27.3-2.8 11.2-3.6 24.7-4.4 39.9-.5 7.7-3.6 18-6.8 29-32.1 22.9-76.7 32.9-114.3 7.2zm257.4-11.5c-.9 16.8-41.2 19.9-63.2 46.5-13.2 15.7-29.4 24.4-43.6 25.5s-26.5-4.8-33.7-19.3c-4.7-11.1-2.4-23.1 1.1-36.3 3.7-14.2 9.2-28.8 9.9-40.6 .8-15.2 1.7-28.5 4.2-38.7 2.6-10.3 6.6-17.2 13.7-21.1 .3-.2 .7-.3 1-.5 .8 13.2 7.3 26.6 18.8 29.5 12.6 3.3 30.7-7.5 38.4-16.3 9-.3 15.7-.9 22.6 5.1 9.9 8.5 7.1 30.3 17.1 41.6 10.6 11.6 14 19.5 13.7 24.6zM173.3 148.7c2 1.9 4.7 4.5 8 7.1 6.6 5.2 15.8 10.6 27.3 10.6 11.6 0 22.5-5.9 31.8-10.8 4.9-2.6 10.9-7 14.8-10.4s5.9-6.3 3.1-6.6-2.6 2.6-6 5.1c-4.4 3.2-9.7 7.4-13.9 9.8-7.4 4.2-19.5 10.2-29.9 10.2s-18.7-4.8-24.9-9.7c-3.1-2.5-5.7-5-7.7-6.9-1.5-1.4-1.9-4.6-4.3-4.9-1.4-.1-1.8 3.7 1.7 6.5z" />
              </svg>
            )}
          </div>
        )}

        <div className="flex flex-row gap-2 items-left">
          {pypi && (
            <a
              target="_blank"
              href={`https://pypi.org/project/${pypi}/`}
              rel="noreferrer">
              <img
                src={`https://img.shields.io/pypi/v/${pypi}?color=blue`}
                className="h-5"
                alt="PyPI"
              />
            </a>
          )}
          {npm && (
            <a
              target="_blank"
              href={`https://www.npmjs.com/package/${npm}`}
              rel="noreferrer">
              <img
                src={`https://img.shields.io/npm/v/${npm}?color=bf4c4b`}
                className="h-5"
                alt="NPM"
              />
            </a>
          )}
        </div>
      </div>
    );
  };

  const tocHeader = () => {
    return (
      <div className="w-fit">
        <PlatformIcons />
        <div className="flex gap-2 mt-2">
          {github &&
            github.length > 0 &&
            (github.length === 1 ? (
              <a
                href={github[0]}
                rel="noreferrer noopener"
                target="_blank"
                className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&amp;_svg]:size-5 text-fd-muted-foreground md:[&amp;_svg]:size-4.5"
                aria-label="Source"
                data-active="false">
                <svg role="img" viewBox="0 0 24 24" fill="currentColor">
                  <path d="M12 .297c-6.63 0-12 5.373-12 12 0 5.303 3.438 9.8 8.205 11.385.6.113.82-.258.82-.577 0-.285-.01-1.04-.015-2.04-3.338.724-4.042-1.61-4.042-1.61C4.422 18.07 3.633 17.7 3.633 17.7c-1.087-.744.084-.729.084-.729 1.205.084 1.838 1.236 1.838 1.236 1.07 1.835 2.809 1.305 3.495.998.108-.776.417-1.305.76-1.605-2.665-.3-5.466-1.332-5.466-5.93 0-1.31.465-2.38 1.235-3.22-.135-.303-.54-1.523.105-3.176 0 0 1.005-.322 3.3 1.23.96-.267 1.98-.399 3-.405 1.02.006 2.04.138 3 .405 2.28-1.552 3.285-1.23 3.285-1.23.645 1.653.24 2.873.12 3.176.765.84 1.23 1.91 1.23 3.22 0 4.61-2.805 5.625-5.475 5.92.42.36.81 1.096.81 2.22 0 1.606-.015 2.896-.015 3.286 0 .315.21.69.825.57C20.565 22.092 24 17.592 24 12.297c0-6.627-5.373-12-12-12"></path>
                </svg>
                Source
                <ExternalLink className="w-4 h-4 ml-auto" />
              </a>
            ) : (
              <Popover>
                <PopoverTrigger className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&_svg]:size-5 text-fd-muted-foreground md:[&_svg]:size-4.5">
                  <svg role="img" viewBox="0 0 24 24" fill="currentColor">
                    <path d="M12 .297c-6.63 0-12 5.373-12 12 0 5.303 3.438 9.8 8.205 11.385.6.113.82-.258.82-.577 0-.285-.01-1.04-.015-2.04-3.338.724-4.042-1.61-4.042-1.61C4.422 18.07 3.633 17.7 3.633 17.7c-1.087-.744.084-.729.084-.729 1.205.084 1.838 1.236 1.838 1.236 1.07 1.835 2.809 1.305 3.495.998.108-.776.417-1.305.76-1.605-2.665-.3-5.466-1.332-5.466-5.93 0-1.31.465-2.38 1.235-3.22-.135-.303-.54-1.523.105-3.176 0 0 1.005-.322 3.3 1.23.96-.267 1.98-.399 3-.405 1.02.006 2.04.138 3 .405 2.28-1.552 3.285-1.23 3.285-1.23.645 1.653.24 2.873.12 3.176.765.84 1.23 1.91 1.23 3.22 0 4.61-2.805 5.625-5.475 5.92.42.36.81 1.096.81 2.22 0 1.606-.015 2.896-.015 3.286 0 .315.21.69.825.57C20.565 22.092 24 17.592 24 12.297c0-6.627-5.373-12-12-12"></path>
                  </svg>
                  Source
                  <ChevronDown className="h-4 w-4" />
                </PopoverTrigger>
                <PopoverContent className="w-48 p-1">
                  <div className="flex flex-col gap-1">
                    {github.map((link, index) => (
                      <a
                        key={index}
                        href={link}
                        rel="noreferrer noopener"
                        target="_blank"
                        className="inline-flex gap-2 w-full items-center rounded-md p-2 text-sm hover:bg-fd-accent hover:text-fd-accent-foreground">
                        {link.includes('python')
                          ? 'Python'
                          : link.includes('typescript')
                            ? 'TypeScript'
                            : `Source ${index + 1}`}
                        <ExternalLink className="w-4 h-4 ml-auto" />
                      </a>
                    ))}
                  </div>
                </PopoverContent>
              </Popover>
            ))}
          {slug.includes('libraries') && (
            <a
              className="inline-flex gap-2 w-fit items-center justify-center rounded-md text-sm font-medium transition-colors duration-100 disabled:pointer-events-none disabled:opacity-50 focus-visible:outline-none hover:bg-fd-accent hover:text-fd-accent-foreground p-1.5 [&amp;_svg]:size-5 text-fd-muted-foreground md:[&amp;_svg]:size-4.5"
              href={`/api/${page.data.title.toLowerCase()}`}>
              <CodeXml size={12} />
              Reference
            </a>
          )}
        </div>
        <hr className="my-2 border-t border-fd-border" />
      </div>
    );
  };

  return (
    <DocsPage
      toc={page.data.toc}
      tableOfContent={{ header: tocHeader() }}
      full={page.data.full}>
      <div className="flex flex-row w-full items-start">
        <div className="flex-1">
          <div className="flex flex-row w-full">
            <DocsTitle>{page.data.title}</DocsTitle>

            <div className="ml-auto">
              {apiSection && versionItems.length > 1 && (
                <Popover>
                  <PopoverTrigger
                    className={cn(
                      buttonVariants({
                        color: 'secondary',
                        size: 'sm',
                        className: 'gap-2',
                      })
                    )}>
                    {(() => {
                      // Find the current version label
                      let currentLabel = 'Current';
                      if (apiVersionSlug.length > 0) {
                        const found = versionItems.find(
                          (item) =>
                            item.label !== 'Current' &&
                            apiVersionSlug[0] === item.label
                        );
                        if (found) currentLabel = found.label;
                      }
                      return (
                        <>
                          API Version: {currentLabel}
                          <ChevronDown className="size-3.5 text-fd-muted-foreground" />
                        </>
                      );
                    })()}
                  </PopoverTrigger>
                  <PopoverContent className="flex flex-col overflow-auto">
                    {versionItems.map((item) => {
                      // Build the href for each version
                      const href =
                        item.label === 'Current'
                          ? `/api/${apiSection}`
                          : `/api/${apiSection}/${item.label}`;
                      // Highlight current version
                      const isCurrent =
                        (item.label === 'Current' &&
                          apiVersionSlug.length === 0) ||
                        (item.label !== 'Current' &&
                          apiVersionSlug[0] === item.label);
                      return (
                        <Link
                          key={item.label}
                          href={href}
                          className={cn(
                            'px-3 py-1 rounded hover:bg-fd-muted',
                            isCurrent && 'font-bold bg-fd-muted'
                          )}>
                          API version: {item.label}
                        </Link>
                      );
                    })}
                  </PopoverContent>
                </Popover>
              )}
            </div>
          </div>
          <DocsDescription className="text-md mt-1">
            {page.data.description}
          </DocsDescription>
        </div>
      </div>
      <DocsBody>
        <MDXContent
          components={getMDXComponents({
            // this allows you to link to other pages with relative file paths
            a: createRelativeLink(source, page),
          })}
        />
      </DocsBody>
    </DocsPage>
  );
}

export async function generateStaticParams() {
  return source.generateParams();
}

export async function generateMetadata(props: {
  params: Promise<{ slug?: string[] }>;
}): Promise<Metadata> {
  const params = await props.params;
  const page = source.getPage(params.slug);
  if (!page) notFound();

  let title = `${page.data.title} | Cua Docs`;
  if (page.url.includes('api')) title = `${page.data.title} | Cua API Docs`;
  if (page.url.includes('guide'))
    title = ` Guide: ${page.data.title} | Cua Docs`;

  return {
    title,
    description: page.data.description,
    openGraph: {
      title,
      description: page.data.description,
      type: 'article',
      siteName: 'Cua Docs',
      url: 'https://trycua.com/docs',
    },
  };
}

```

--------------------------------------------------------------------------------
/libs/python/mcp-server/mcp_server/server.py:
--------------------------------------------------------------------------------

```python
import asyncio
import base64
import inspect
import logging
import os
import signal
import sys
import traceback
import uuid
from typing import Any, Dict, List, Optional, Union, Tuple

import anyio

# Configure logging to output to stderr for debug visibility
logging.basicConfig(
    level=logging.DEBUG,  # Changed to DEBUG
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    stream=sys.stderr,
)
logger = logging.getLogger("mcp-server")

# More visible startup message
logger.debug("MCP Server module loading...")

try:
    from mcp.server.fastmcp import Context, FastMCP
    # Use the canonical Image type
    from mcp.server.fastmcp.utilities.types import Image

    logger.debug("Successfully imported FastMCP")
except ImportError as e:
    logger.error(f"Failed to import FastMCP: {e}")
    traceback.print_exc(file=sys.stderr)
    sys.exit(1)

try:
    from computer import Computer
    from agent import ComputerAgent

    logger.debug("Successfully imported Computer and Agent modules")
except ImportError as e:
    logger.error(f"Failed to import Computer/Agent modules: {e}")
    traceback.print_exc(file=sys.stderr)
    sys.exit(1)

try:
    from .session_manager import get_session_manager, initialize_session_manager, shutdown_session_manager
    logger.debug("Successfully imported session manager")
except ImportError as e:
    logger.error(f"Failed to import session manager: {e}")
    traceback.print_exc(file=sys.stderr)
    sys.exit(1)

def get_env_bool(key: str, default: bool = False) -> bool:
    """Get boolean value from environment variable."""
    return os.getenv(key, str(default)).lower() in ("true", "1", "yes")

async def _maybe_call_ctx_method(ctx: Context, method_name: str, *args, **kwargs) -> None:
    """Call a context helper if it exists, awaiting the result when necessary."""
    method = getattr(ctx, method_name, None)
    if not callable(method):
        return
    result = method(*args, **kwargs)
    if inspect.isawaitable(result):
        await result

def _normalise_message_content(content: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
    """Normalise message content to a list of structured parts."""
    if isinstance(content, list):
        return content
    if content is None:
        return []
    return [{"type": "output_text", "text": str(content)}]

def _extract_text_from_content(content: Union[str, List[Dict[str, Any]]]) -> str:
    """Extract textual content for inclusion in the aggregated result string."""
    if isinstance(content, str):
        return content
    texts: List[str] = []
    for part in content or []:
        if not isinstance(part, dict):
            continue
        if part.get("type") in {"output_text", "text"} and part.get("text"):
            texts.append(str(part["text"]))
    return "\n".join(texts)

def _serialise_tool_content(content: Any) -> str:
    """Convert tool outputs into a string for aggregation."""
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        texts: List[str] = []
        for part in content:
            if isinstance(part, dict) and part.get("type") in {"output_text", "text"} and part.get("text"):
                texts.append(str(part["text"]))
        if texts:
            return "\n".join(texts)
    if content is None:
        return ""
    return str(content)

def serve() -> FastMCP:
    """Create and configure the MCP server."""
    # NOTE: Do not pass model_config here; FastMCP 2.12.x doesn't support it.
    server = FastMCP(name="cua-agent")

    @server.tool(structured_output=False)
    async def screenshot_cua(ctx: Context, session_id: Optional[str] = None) -> Any:
        """
        Take a screenshot of the current MacOS VM screen and return the image.
        
        Args:
            session_id: Optional session ID for multi-client support. If not provided, a new session will be created.
        """
        session_manager = get_session_manager()
        
        async with session_manager.get_session(session_id) as session:
            screenshot = await session.computer.interface.screenshot()
            # Returning Image object is fine when structured_output=False
            return Image(format="png", data=screenshot)

    @server.tool(structured_output=False)
    async def run_cua_task(ctx: Context, task: str, session_id: Optional[str] = None) -> Any:
        """
        Run a Computer-Use Agent (CUA) task in a MacOS VM and return (combined text, final screenshot).
        
        Args:
            task: The task description for the agent to execute
            session_id: Optional session ID for multi-client support. If not provided, a new session will be created.
        """
        session_manager = get_session_manager()
        task_id = str(uuid.uuid4())
        
        try:
            logger.info(f"Starting CUA task: {task} (task_id: {task_id})")

            async with session_manager.get_session(session_id) as session:
                # Register this task with the session
                await session_manager.register_task(session.session_id, task_id)
                
                try:
                    # Get model name
                    model_name = os.getenv("CUA_MODEL_NAME", "anthropic/claude-3-5-sonnet-20241022")
                    logger.info(f"Using model: {model_name}")

                    # Create agent with the new v0.4.x API
                    agent = ComputerAgent(
                        model=model_name,
                        only_n_most_recent_images=int(os.getenv("CUA_MAX_IMAGES", "3")),
                        verbosity=logging.INFO,
                        tools=[session.computer],
                    )

                    messages = [{"role": "user", "content": task}]

                    # Collect all results
                    aggregated_messages: List[str] = []
                    async for result in agent.run(messages):
                        logger.info("Agent processing step")
                        ctx.info("Agent processing step")

                        outputs = result.get("output", [])
                        for output in outputs:
                            output_type = output.get("type")

                            if output_type == "message":
                                logger.debug("Streaming assistant message: %s", output)
                                content = _normalise_message_content(output.get("content"))
                                aggregated_text = _extract_text_from_content(content)
                                if aggregated_text:
                                    aggregated_messages.append(aggregated_text)
                                await _maybe_call_ctx_method(
                                    ctx,
                                    "yield_message",
                                    role=output.get("role", "assistant"),
                                    content=content,
                                )

                            elif output_type in {"tool_use", "computer_call", "function_call"}:
                                logger.debug("Streaming tool call: %s", output)
                                call_id = output.get("id") or output.get("call_id")
                                tool_name = output.get("name") or output.get("action", {}).get("type")
                                tool_input = output.get("input") or output.get("arguments") or output.get("action")
                                if call_id:
                                    await _maybe_call_ctx_method(
                                        ctx,
                                        "yield_tool_call",
                                        name=tool_name,
                                        call_id=call_id,
                                        input=tool_input,
                                    )

                            elif output_type in {"tool_result", "computer_call_output", "function_call_output"}:
                                logger.debug("Streaming tool output: %s", output)
                                call_id = output.get("call_id") or output.get("id")
                                content = output.get("content") or output.get("output")
                                aggregated_text = _serialise_tool_content(content)
                                if aggregated_text:
                                    aggregated_messages.append(aggregated_text)
                                if call_id:
                                    await _maybe_call_ctx_method(
                                        ctx,
                                        "yield_tool_output",
                                        call_id=call_id,
                                        output=content,
                                        is_error=output.get("status") == "failed" or output.get("is_error", False),
                                    )

                    logger.info("CUA task completed successfully")
                    ctx.info("CUA task completed successfully")

                    screenshot_image = Image(
                        format="png",
                        data=await session.computer.interface.screenshot(),
                    )

                    return (
                        "\n".join(aggregated_messages).strip() or "Task completed with no text output.",
                        screenshot_image,
                    )
                    
                finally:
                    # Unregister the task from the session
                    await session_manager.unregister_task(session.session_id, task_id)

        except Exception as e:
            error_msg = f"Error running CUA task: {str(e)}\n{traceback.format_exc()}"
            logger.error(error_msg)
            ctx.error(error_msg)
            
            # Try to get a screenshot from the session if available
            try:
                if session_id:
                    async with session_manager.get_session(session_id) as session:
                        screenshot = await session.computer.interface.screenshot()
                        return (
                            f"Error during task execution: {str(e)}",
                            Image(format="png", data=screenshot),
                        )
            except Exception:
                pass
                
            # If we can't get a screenshot, return a placeholder
            return (
                f"Error during task execution: {str(e)}",
                Image(format="png", data=b""),
            )

    @server.tool(structured_output=False)
    async def run_multi_cua_tasks(ctx: Context, tasks: List[str], session_id: Optional[str] = None, concurrent: bool = False) -> Any:
        """
        Run multiple CUA tasks and return a list of (combined text, screenshot).
        
        Args:
            tasks: List of task descriptions to execute
            session_id: Optional session ID for multi-client support. If not provided, a new session will be created.
            concurrent: If True, run tasks concurrently. If False, run sequentially (default).
        """
        total_tasks = len(tasks)
        if total_tasks == 0:
            ctx.report_progress(1.0)
            return []

        session_manager = get_session_manager()
        
        if concurrent and total_tasks > 1:
            # Run tasks concurrently
            logger.info(f"Running {total_tasks} tasks concurrently")
            ctx.info(f"Running {total_tasks} tasks concurrently")
            
            # Create tasks with progress tracking
            async def run_task_with_progress(task_index: int, task: str) -> Tuple[int, Tuple[str, Image]]:
                ctx.report_progress(task_index / total_tasks)
                result = await run_cua_task(ctx, task, session_id)
                ctx.report_progress((task_index + 1) / total_tasks)
                return task_index, result
            
            # Create all task coroutines
            task_coroutines = [run_task_with_progress(i, task) for i, task in enumerate(tasks)]
            
            # Wait for all tasks to complete
            results_with_indices = await asyncio.gather(*task_coroutines, return_exceptions=True)
            
            # Sort results by original task order and handle exceptions
            results: List[Tuple[str, Image]] = []
            for result in results_with_indices:
                if isinstance(result, Exception):
                    logger.error(f"Task failed with exception: {result}")
                    ctx.error(f"Task failed: {str(result)}")
                    results.append((f"Task failed: {str(result)}", Image(format="png", data=b"")))
                else:
                    _, task_result = result
                    results.append(task_result)
            
            return results
        else:
            # Run tasks sequentially (original behavior)
            logger.info(f"Running {total_tasks} tasks sequentially")
            ctx.info(f"Running {total_tasks} tasks sequentially")
            
            results: List[Tuple[str, Image]] = []
            for i, task in enumerate(tasks):
                logger.info(f"Running task {i+1}/{total_tasks}: {task}")
                ctx.info(f"Running task {i+1}/{total_tasks}: {task}")

                ctx.report_progress(i / total_tasks)
                task_result = await run_cua_task(ctx, task, session_id)
                results.append(task_result)
                ctx.report_progress((i + 1) / total_tasks)

            return results

    @server.tool(structured_output=False)
    async def get_session_stats(ctx: Context) -> Dict[str, Any]:
        """
        Get statistics about active sessions and resource usage.
        """
        session_manager = get_session_manager()
        return session_manager.get_session_stats()

    @server.tool(structured_output=False)
    async def cleanup_session(ctx: Context, session_id: str) -> str:
        """
        Cleanup a specific session and release its resources.
        
        Args:
            session_id: The session ID to cleanup
        """
        session_manager = get_session_manager()
        await session_manager.cleanup_session(session_id)
        return f"Session {session_id} cleanup initiated"

    return server


server = serve()

async def run_server():
    """Run the MCP server with proper lifecycle management."""
    session_manager = None
    try:
        logger.debug("Starting MCP server...")
        
        # Initialize session manager
        session_manager = await initialize_session_manager()
        logger.info("Session manager initialized")
        
        # Set up signal handlers for graceful shutdown
        def signal_handler(signum, frame):
            logger.info(f"Received signal {signum}, initiating graceful shutdown...")
            # Create a task to shutdown gracefully
            asyncio.create_task(graceful_shutdown())
        
        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)
        
        # Start the server
        logger.info("Starting FastMCP server...")
        # Use run_stdio_async directly instead of server.run() to avoid nested event loops
        await server.run_stdio_async()
        
    except Exception as e:
        logger.error(f"Error starting server: {e}")
        traceback.print_exc(file=sys.stderr)
        raise
    finally:
        # Ensure cleanup happens
        if session_manager:
            logger.info("Shutting down session manager...")
            await shutdown_session_manager()

async def graceful_shutdown():
    """Gracefully shutdown the server and all sessions."""
    logger.info("Initiating graceful shutdown...")
    try:
        await shutdown_session_manager()
        logger.info("Graceful shutdown completed")
    except Exception as e:
        logger.error(f"Error during graceful shutdown: {e}")
    finally:
        # Exit the process
        import os
        os._exit(0)

def main():
    """Run the MCP server with proper async lifecycle management."""
    try:
        # Use anyio.run instead of asyncio.run to avoid nested event loop issues
        anyio.run(run_server)
    except KeyboardInterrupt:
        logger.info("Server interrupted by user")
    except Exception as e:
        logger.error(f"Error starting server: {e}")
        traceback.print_exc(file=sys.stderr)
        sys.exit(1)

if __name__ == "__main__":
    main()

```
Page 9/16FirstPrevNextLast