# Directory Structure ``` ├── demonstration.gif ├── Dockerfile ├── icon.png ├── LICENSE ├── MANIFEST.in ├── pyproject.toml ├── README.md ├── smithery.yaml ├── src │ ├── computer_control_mcp │ │ ├── __init__.py │ │ ├── __main__.py │ │ ├── cli.py │ │ ├── core.py │ │ ├── FZYTK.TTF │ │ ├── gui.py │ │ ├── server.py │ │ ├── test_image.png │ │ └── test.py │ └── README.md ├── tests │ ├── conftest.py │ ├── rapidocr_test.py │ ├── README.md │ ├── run_cli.py │ ├── run_server.py │ ├── setup.py │ ├── test_computer_control.py │ └── test_screenshot.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /tests/README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Computer Control MCP Tests 2 | 3 | This directory contains the tests for the Computer Control MCP package. 4 | 5 | ## Running Tests 6 | 7 | To run the tests, use pytest: 8 | 9 | ```bash 10 | pytest 11 | ``` 12 | 13 | Or with specific test: 14 | 15 | ```bash 16 | pytest tests/test_computer_control.py 17 | ``` 18 | 19 | ## Test Structure 20 | 21 | - `conftest.py`: Pytest configuration 22 | - `test_computer_control.py`: Tests for the core functionality 23 | ``` -------------------------------------------------------------------------------- /src/README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Computer Control MCP Source Code 2 | 3 | This directory contains the source code for the Computer Control MCP package. 4 | 5 | ## Structure 6 | 7 | - `computer_control_mcp/`: Main package directory 8 | - `__init__.py`: Package initialization 9 | - `__main__.py`: Entry point for running as a module 10 | - `core.py`: Core functionality 11 | - `cli.py`: Command-line interface 12 | - `gui.py`: Graphical user interface for testing 13 | ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Computer Control MCP 2 | 3 | ### MCP server that provides computer control capabilities, like mouse, keyboard, OCR, etc. using PyAutoGUI, RapidOCR, ONNXRuntime. Similar to 'computer-use' by Anthropic. With Zero External Dependencies. 4 | 5 | <div align="center" style="text-align:center;font-family: monospace; display: flex; align-items: center; justify-content: center; width: 100%; gap: 10px"> 6 | <a href="https://nextjs-boilerplate-ashy-nine-64.vercel.app/demo-computer-control"><img 7 | src="https://komarev.com/ghpvc/?username=AB498&label=DEMO&style=for-the-badge&color=CC0000" /></a> 8 | <a href="https://discord.gg/ZeeqSBpjU2"><img 9 | src="https://img.shields.io/discord/1095854826786668545?style=for-the-badge&color=0000CC" alt="Discord"></a> 10 | <a href="https://img.shields.io/badge/License-MIT-yellow.svg"><img 11 | src="https://img.shields.io/badge/License-MIT-yellow.svg?style=for-the-badge&color=00CC00" alt="License: MIT"></a> 12 | <a href="https://pypi.org/project/computer-control-mcp"><img 13 | src="https://img.shields.io/pypi/v/computer-control-mcp?style=for-the-badge" alt="PyPi"></a> 14 | </div> 15 | 16 | --- 17 | 18 |  19 | 20 | ## Quick Usage (MCP Setup Using `uvx`) 21 | 22 | ***Note:** Running `uvx computer-control-mcp@latest` for the first time will download python dependencies (around 70MB) which may take some time. Recommended to run this in a terminal before using it as MCP. Subsequent runs will be instant.* 23 | 24 | ```json 25 | { 26 | "mcpServers": { 27 | "computer-control-mcp": { 28 | "command": "uvx", 29 | "args": ["computer-control-mcp@latest"] 30 | } 31 | } 32 | } 33 | ``` 34 | 35 | OR install globally with `pip`: 36 | ```bash 37 | pip install computer-control-mcp 38 | ``` 39 | Then run the server with: 40 | ```bash 41 | computer-control-mcp # instead of uvx computer-control-mcp, so you can use the latest version, also you can `uv cache clean` to clear the cache and `uvx` again to use latest version. 42 | ``` 43 | 44 | ## Features 45 | 46 | - Control mouse movements and clicks 47 | - Type text at the current cursor position 48 | - Take screenshots of the entire screen or specific windows with optional saving to downloads directory 49 | - Extract text from screenshots using OCR (Optical Character Recognition) 50 | - List and activate windows 51 | - Press keyboard keys 52 | - Drag and drop operations 53 | 54 | ## Available Tools 55 | 56 | ### Mouse Control 57 | - `click_screen(x: int, y: int)`: Click at specified screen coordinates 58 | - `move_mouse(x: int, y: int)`: Move mouse cursor to specified coordinates 59 | - `drag_mouse(from_x: int, from_y: int, to_x: int, to_y: int, duration: float = 0.5)`: Drag mouse from one position to another 60 | - `mouse_down(button: str = "left")`: Hold down a mouse button ('left', 'right', 'middle') 61 | - `mouse_up(button: str = "left")`: Release a mouse button ('left', 'right', 'middle') 62 | 63 | ### Keyboard Control 64 | - `type_text(text: str)`: Type the specified text at current cursor position 65 | - `press_key(key: str)`: Press a specified keyboard key 66 | - `key_down(key: str)`: Hold down a specific keyboard key until released 67 | - `key_up(key: str)`: Release a specific keyboard key 68 | - `press_keys(keys: Union[str, List[Union[str, List[str]]]])`: Press keyboard keys (supports single keys, sequences, and combinations) 69 | 70 | ### Screen and Window Management 71 | - `take_screenshot(title_pattern: str = None, use_regex: bool = False, threshold: int = 60, scale_percent_for_ocr: int = None, save_to_downloads: bool = False)`: Capture screen or window 72 | - `take_screenshot_with_ocr(title_pattern: str = None, use_regex: bool = False, threshold: int = 10, scale_percent_for_ocr: int = None, save_to_downloads: bool = False)`: Extract adn return text with coordinates using OCR from screen or window 73 | - `get_screen_size()`: Get current screen resolution 74 | - `list_windows()`: List all open windows 75 | - `activate_window(title_pattern: str, use_regex: bool = False, threshold: int = 60)`: Bring specified window to foreground 76 | 77 | ## Development 78 | 79 | ### Setting up the Development Environment 80 | 81 | ```bash 82 | # Clone the repository 83 | git clone https://github.com/AB498/computer-control-mcp.git 84 | cd computer-control-mcp 85 | 86 | # Install in development mode 87 | pip install -e . 88 | 89 | # Start server 90 | python -m computer_control_mcp.core 91 | 92 | # -- OR -- 93 | 94 | # Build 95 | hatch build 96 | 97 | # Non-windows 98 | pip install dist/*.whl --upgrade 99 | 100 | # Windows 101 | $latest = Get-ChildItem .\dist\*.whl | Sort-Object LastWriteTime -Descending | Select-Object -First 1 102 | pip install $latest.FullName --upgrade 103 | 104 | # Run 105 | computer-control-mcp 106 | ``` 107 | 108 | ### Running Tests 109 | 110 | ```bash 111 | python -m pytest 112 | ``` 113 | 114 | ## API Reference 115 | 116 | See the [API Reference](docs/api.md) for detailed information about the available functions and classes. 117 | 118 | ## License 119 | 120 | MIT 121 | 122 | ## For more information or help 123 | 124 | - [Email ([email protected])](mailto:[email protected]) 125 | - [Discord (CodePlayground)](https://discord.gg/ZeeqSBpjU2) 126 | ``` -------------------------------------------------------------------------------- /tests/run_cli.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | """ 3 | Simple script to run the Computer Control MCP CLI. 4 | """ 5 | 6 | from computer_control_mcp.cli import main 7 | 8 | if __name__ == "__main__": 9 | main() 10 | ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Pytest configuration file. 3 | """ 4 | 5 | import pytest 6 | import sys 7 | import os 8 | 9 | # Add the src directory to the Python path 10 | sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) 11 | ``` -------------------------------------------------------------------------------- /src/computer_control_mcp/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Computer Control MCP - Python package for computer control via MCP. 3 | 4 | This package provides computer control capabilities using PyAutoGUI through a 5 | Model Context Protocol (MCP) server. 6 | """ 7 | 8 | from computer_control_mcp.core import mcp, main 9 | 10 | __version__ = "0.1.2" 11 | __all__ = ["mcp", "main"] 12 | ``` -------------------------------------------------------------------------------- /src/computer_control_mcp/server.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Server module for Computer Control MCP. 3 | 4 | This module provides a simple way to run the MCP server. 5 | """ 6 | 7 | from computer_control_mcp.core import main as run_server 8 | 9 | def main(): 10 | """Run the MCP server.""" 11 | print("Starting Computer Control MCP server...") 12 | run_server() 13 | 14 | if __name__ == "__main__": 15 | main() 16 | ``` -------------------------------------------------------------------------------- /tests/run_server.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | """ 3 | Simple script to run the Computer Control MCP server. 4 | """ 5 | 6 | # import sys 7 | # import os 8 | # sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) 9 | # from computer_control_mcp.core import main 10 | 11 | from computer_control_mcp.core import main 12 | 13 | if __name__ == "__main__": 14 | print("Starting Computer Control MCP server...") 15 | main() 16 | ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml 1 | # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml 2 | 3 | startCommand: 4 | type: stdio 5 | configSchema: 6 | # JSON Schema defining the configuration options for the MCP. 7 | type: object 8 | description: Empty config 9 | commandFunction: 10 | # A JS function that produces the CLI command based on the given config to start the MCP on stdio. 11 | |- 12 | (config) => ({ command: 'python', args: ['src/computer_control_mcp/core.py'] }) 13 | exampleConfig: {} 14 | ``` -------------------------------------------------------------------------------- /tests/setup.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | """ 3 | Backward compatibility setup.py file for Computer Control MCP. 4 | This file is provided for backward compatibility with tools that don't support pyproject.toml. 5 | """ 6 | 7 | import setuptools 8 | 9 | if __name__ == "__main__": 10 | try: 11 | setuptools.setup() 12 | except Exception as e: 13 | print(f"Error: {e}") 14 | print("\nThis package uses pyproject.toml for configuration.") 15 | print("Please use a PEP 517 compatible build tool like pip or build.") 16 | print("For example: pip install .") 17 | ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile 1 | # Use a lightweight Python base image 2 | FROM python:3.12-slim 3 | 4 | # Set environment variables for Python 5 | ENV PYTHONDONTWRITEBYTECODE=1 \ 6 | PYTHONUNBUFFERED=1 7 | 8 | # Set working directory 9 | WORKDIR /app 10 | 11 | # Copy dependency file(s) 12 | COPY pyproject.toml . 13 | COPY src/ src/ 14 | COPY README.md README.md 15 | 16 | # Install build backend (Hatchling) 17 | RUN pip install --upgrade pip && \ 18 | pip install hatchling && \ 19 | pip install -e . 20 | 21 | # Copy any additional files (e.g. configs, CLI, entrypoints) 22 | COPY . . 23 | 24 | # Default command (can be overridden) 25 | CMD ["python", "-m", "computer_control_mcp"] 26 | ``` -------------------------------------------------------------------------------- /src/computer_control_mcp/__main__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Entry point for running the Computer Control MCP as a module. 3 | 4 | This module serves as the main entry point for the package. 5 | When executed directly (e.g., with `python -m computer_control_mcp`), 6 | it will run the CLI interface. 7 | 8 | For CLI functionality, use: 9 | computer-control-mcp <command> 10 | python -m computer_control_mcp <command> 11 | """ 12 | 13 | from computer_control_mcp.cli import main as cli_main 14 | 15 | def main(): 16 | """Main entry point for the package.""" 17 | # Run the CLI when the module is executed directly 18 | cli_main() 19 | 20 | if __name__ == "__main__": 21 | main() 22 | ``` -------------------------------------------------------------------------------- /tests/test_screenshot.py: -------------------------------------------------------------------------------- ```python 1 | import sys 2 | sys.path.append('src') 3 | from computer_control_mcp.core import take_screenshot 4 | 5 | # Test with save_to_downloads=False 6 | result = take_screenshot(mode='whole_screen', save_to_downloads=False) 7 | print('Base64 image included:', 'base64_image' in result) 8 | print('MCP Image included:', 'image' in result) 9 | 10 | # Test with save_to_downloads=True 11 | result = take_screenshot(mode='whole_screen', save_to_downloads=True) 12 | print('Base64 image included:', 'base64_image' in result) 13 | print('MCP Image included:', 'image' in result) 14 | print('File path included:', 'file_path' in result) 15 | ``` -------------------------------------------------------------------------------- /tests/rapidocr_test.py: -------------------------------------------------------------------------------- ```python 1 | import cv2 2 | from rapidocr import RapidOCR 3 | from rapidocr_onnxruntime import VisRes 4 | 5 | image_path = r"C:\Users\Admin\AppData\Local\Temp\tmpdw2d8r14\screenshot_20250815_033153_f99a8396.png" 6 | img = cv2.imread(image_path) 7 | if img is None: 8 | print(f"Failed to load img: {image_path}") 9 | else: 10 | print(f"Loaded img: {image_path}, shape: {img.shape}") 11 | engine = RapidOCR() 12 | vis = VisRes() 13 | output = engine(img) 14 | 15 | # Separate into boxes, texts, and scores 16 | boxes = output.boxes 17 | txts = output.txts 18 | scores = output.scores 19 | zipped_results = list(zip(boxes, txts, scores)) 20 | print(f"Found {len(zipped_results)} text items in OCR result.") 21 | print(f"First 10 items: {str(zipped_results).encode("utf-8", errors="ignore")}") 22 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [build-system] 2 | requires = ["hatchling"] 3 | build-backend = "hatchling.build" 4 | 5 | [project] 6 | name = "computer-control-mcp" 7 | version = "0.3.6" 8 | description = "MCP server that provides computer control capabilities, like mouse, keyboard, OCR, etc. using PyAutoGUI, RapidOCR, ONNXRuntime. Similar to 'computer-use' by Anthropic. With Zero External Dependencies." 9 | readme = "README.md" 10 | requires-python = ">=3.8" 11 | license = {text = "MIT"} 12 | authors = [{name = "AB498", email = "[email protected]"}] 13 | classifiers = [ 14 | "Development Status :: 4 - Beta", 15 | "Intended Audience :: Developers", 16 | "License :: OSI Approved :: MIT License", 17 | "Programming Language :: Python :: 3", 18 | "Programming Language :: Python :: 3.12", 19 | "Topic :: Software Development :: Libraries", 20 | "Topic :: Utilities" 21 | ] 22 | dependencies = [ 23 | "pyautogui==0.9.54", 24 | "mcp[cli]==1.13.0", 25 | "pillow==11.3.0", 26 | "pygetwindow==0.0.9", 27 | "pywinctl==0.4.1", 28 | "fuzzywuzzy==0.18.0", 29 | "rapidocr==3.3.1", 30 | "onnxruntime==1.22.0", 31 | "rapidocr_onnxruntime==1.2.3", 32 | "opencv-python==4.12.0.88", 33 | "python-Levenshtein>=0.20.9", 34 | "mss>=7.0.0" 35 | ] 36 | 37 | [project.urls] 38 | Homepage = "https://github.com/AB498/computer-control-mcp" 39 | Issues = "https://github.com/AB498/computer-control-mcp/issues" 40 | Documentation = "https://github.com/AB498/computer-control-mcp#readme" 41 | 42 | [project.scripts] 43 | computer-control-mcp = "computer_control_mcp.cli:main" 44 | computer-control-mcp-server = "computer_control_mcp.server:main" 45 | 46 | [tool.hatch.build] 47 | sources = ["src"] 48 | packages = ["src/computer_control_mcp"] 49 | 50 | [tool.hatch.build.targets.wheel] 51 | packages = ["src/computer_control_mcp"] 52 | ``` -------------------------------------------------------------------------------- /src/computer_control_mcp/gui.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | GUI Test Harness for Computer Control MCP. 3 | 4 | This module provides a graphical user interface for testing the Computer Control MCP functionality. 5 | """ 6 | 7 | import tkinter as tk 8 | from tkinter import ttk, scrolledtext 9 | from PIL import Image, ImageTk 10 | import pyautogui 11 | import json 12 | import io 13 | 14 | from computer_control_mcp.core import mcp 15 | 16 | class TestHarnessGUI: 17 | def __init__(self, root): 18 | self.root = root 19 | self.root.title("Computer Control Test Harness") 20 | self.root.geometry("800x600") 21 | 22 | # Create main frame with scrollbar 23 | self.main_frame = ttk.Frame(root) 24 | self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) 25 | 26 | # Create test sections 27 | self.create_click_test_section() 28 | self.create_type_text_section() 29 | self.create_screenshot_section() 30 | self.create_output_section() 31 | 32 | # Initialize test results 33 | self.test_results = {} 34 | 35 | def create_click_test_section(self): 36 | frame = ttk.LabelFrame(self.main_frame, text="Mouse Click Test") 37 | frame.pack(fill=tk.X, padx=5, pady=5) 38 | 39 | # Coordinates input 40 | coord_frame = ttk.Frame(frame) 41 | coord_frame.pack(fill=tk.X, padx=5, pady=5) 42 | 43 | ttk.Label(coord_frame, text="X:").pack(side=tk.LEFT) 44 | self.x_entry = ttk.Entry(coord_frame, width=10) 45 | self.x_entry.pack(side=tk.LEFT, padx=5) 46 | 47 | ttk.Label(coord_frame, text="Y:").pack(side=tk.LEFT) 48 | self.y_entry = ttk.Entry(coord_frame, width=10) 49 | self.y_entry.pack(side=tk.LEFT, padx=5) 50 | 51 | ttk.Button(frame, text="Test Click", command=self.test_click).pack(pady=5) 52 | 53 | def create_type_text_section(self): 54 | frame = ttk.LabelFrame(self.main_frame, text="Type Text Test") 55 | frame.pack(fill=tk.X, padx=5, pady=5) 56 | 57 | ttk.Label(frame, text="Text to type:").pack(pady=2) 58 | self.text_entry = ttk.Entry(frame) 59 | self.text_entry.pack(fill=tk.X, padx=5, pady=2) 60 | 61 | ttk.Button(frame, text="Test Type Text", command=self.test_type_text).pack(pady=5) 62 | 63 | def create_screenshot_section(self): 64 | frame = ttk.LabelFrame(self.main_frame, text="Screenshot Test") 65 | frame.pack(fill=tk.X, padx=5, pady=5) 66 | 67 | ttk.Button(frame, text="Take Screenshot", command=self.test_screenshot).pack(pady=5) 68 | 69 | # Canvas for screenshot preview 70 | self.screenshot_canvas = tk.Canvas(frame, width=200, height=150) 71 | self.screenshot_canvas.pack(pady=5) 72 | 73 | def create_output_section(self): 74 | frame = ttk.LabelFrame(self.main_frame, text="Test Output") 75 | frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) 76 | 77 | self.output_text = scrolledtext.ScrolledText(frame, height=10) 78 | self.output_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) 79 | 80 | def log_output(self, test_name, request_data, response_data): 81 | self.output_text.insert(tk.END, f"\n===== TEST: {test_name} =====\n") 82 | self.output_text.insert(tk.END, f"REQUEST: {json.dumps(request_data, indent=2)}\n") 83 | self.output_text.insert(tk.END, f"RESPONSE: {response_data}\n") 84 | self.output_text.insert(tk.END, "======================\n") 85 | self.output_text.see(tk.END) 86 | 87 | def test_click(self): 88 | try: 89 | x = int(self.x_entry.get()) 90 | y = int(self.y_entry.get()) 91 | request_data = {"x": x, "y": y} 92 | result = mcp.click_screen(**request_data) 93 | self.log_output("click_screen", request_data, result) 94 | except Exception as e: 95 | self.log_output("click_screen", request_data, f"Error: {str(e)}") 96 | 97 | def test_type_text(self): 98 | try: 99 | text = self.text_entry.get() 100 | request_data = {"text": text} 101 | result = mcp.type_text(**request_data) 102 | self.log_output("type_text", request_data, result) 103 | except Exception as e: 104 | self.log_output("type_text", request_data, f"Error: {str(e)}") 105 | 106 | def test_screenshot(self): 107 | try: 108 | result = mcp.take_screenshot() 109 | # Convert bytes to image for preview 110 | image = Image.open(io.BytesIO(result.data)) 111 | # Resize for preview 112 | image.thumbnail((200, 150)) 113 | photo = ImageTk.PhotoImage(image) 114 | self.screenshot_canvas.create_image(100, 75, image=photo) 115 | self.screenshot_canvas.image = photo # Keep reference 116 | self.log_output("take_screenshot", {}, "Screenshot taken successfully") 117 | except Exception as e: 118 | self.log_output("take_screenshot", {}, f"Error: {str(e)}") 119 | 120 | def main(): 121 | root = tk.Tk() 122 | app = TestHarnessGUI(root) 123 | root.mainloop() 124 | 125 | if __name__ == "__main__": 126 | main() 127 | ``` -------------------------------------------------------------------------------- /src/computer_control_mcp/cli.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Command-line interface for Computer Control MCP. 3 | 4 | This module provides a command-line interface for interacting with the Computer Control MCP. 5 | """ 6 | 7 | import argparse 8 | import sys 9 | from computer_control_mcp.core import mcp, main as run_server 10 | 11 | def parse_args(): 12 | """Parse command-line arguments.""" 13 | parser = argparse.ArgumentParser(description="Computer Control MCP CLI") 14 | 15 | subparsers = parser.add_subparsers(dest="command", help="Command to run") 16 | 17 | # Server command 18 | server_parser = subparsers.add_parser("server", help="Run the MCP server") 19 | 20 | # Click command 21 | click_parser = subparsers.add_parser("click", help="Click at specified coordinates") 22 | click_parser.add_argument("x", type=int, help="X coordinate") 23 | click_parser.add_argument("y", type=int, help="Y coordinate") 24 | 25 | # Type text command 26 | type_parser = subparsers.add_parser("type", help="Type text at current cursor position") 27 | type_parser.add_argument("text", help="Text to type") 28 | 29 | # Screenshot command 30 | screenshot_parser = subparsers.add_parser("screenshot", help="Take a screenshot") 31 | screenshot_parser.add_argument("--mode", choices=["all_windows", "single_window", "whole_screen"], 32 | default="whole_screen", help="Screenshot mode") 33 | screenshot_parser.add_argument("--title", help="Window title pattern (for single_window mode)") 34 | screenshot_parser.add_argument("--regex", action="store_true", help="Use regex for title matching") 35 | screenshot_parser.add_argument("--output", help="Output file path (if not provided, saves to downloads directory)") 36 | screenshot_parser.add_argument("--no-save", action="store_true", help="Don't save images to downloads directory") 37 | 38 | # List windows command 39 | subparsers.add_parser("list-windows", help="List all open windows") 40 | 41 | # GUI command 42 | subparsers.add_parser("gui", help="Launch the GUI test harness") 43 | 44 | return parser.parse_args() 45 | 46 | def main(): 47 | """Main entry point for the CLI.""" 48 | args = parse_args() 49 | 50 | if args.command == "server": 51 | run_server() 52 | 53 | elif args.command == "click": 54 | # Call the tool using the call_tool method 55 | import asyncio 56 | result = asyncio.run(mcp.call_tool("click_screen", {"x": args.x, "y": args.y})) 57 | print(result) 58 | 59 | elif args.command == "type": 60 | # Call the tool using the call_tool method 61 | import asyncio 62 | result = asyncio.run(mcp.call_tool("type_text", {"text": args.text})) 63 | print(result) 64 | 65 | elif args.command == "screenshot": 66 | if args.mode == "single_window" and not args.title: 67 | print("Error: --title is required for single_window mode") 68 | sys.exit(1) 69 | 70 | # Call the tool using the call_tool method 71 | import asyncio 72 | result = asyncio.run(mcp.call_tool("take_screenshot", { 73 | "mode": args.mode, 74 | "title_pattern": args.title, 75 | "use_regex": args.regex, 76 | "save_to_downloads": not args.no_save 77 | })) 78 | 79 | if args.output: 80 | # Save the screenshot to a specific file path provided by user 81 | with open(args.output, "wb") as f: 82 | f.write(result.image.data) 83 | print(f"Screenshot saved to {args.output}") 84 | elif hasattr(result, 'file_path'): 85 | # If image was saved to downloads, show the path 86 | print(f"Screenshot saved to {result.file_path}") 87 | else: 88 | print("Screenshot taken successfully") 89 | 90 | # If we have multiple results (all_windows mode) 91 | if args.mode == "all_windows" and isinstance(result, list): 92 | print("\nAll screenshots:") 93 | for i, item in enumerate(result): 94 | if hasattr(item, 'file_path'): 95 | window_title = item.window_info.title if hasattr(item, 'window_info') else f"Window {i+1}" 96 | print(f"{i+1}. {window_title}: {item.file_path}") 97 | 98 | elif args.command == "list-windows": 99 | # Call the tool using the call_tool method 100 | import asyncio 101 | result = asyncio.run(mcp.call_tool("list_windows", {})) 102 | 103 | # Parse the result 104 | windows = [] 105 | for item in result: 106 | if hasattr(item, 'text'): 107 | try: 108 | import json 109 | window_info = json.loads(item.text) 110 | windows.append(window_info) 111 | except json.JSONDecodeError: 112 | print(f"Failed to parse window info: {item.text}") 113 | 114 | # Display the windows 115 | for i, window in enumerate(windows): 116 | print(f"{i+1}. {window.get('title')} ({window.get('width')}x{window.get('height')})") 117 | 118 | elif args.command == "gui": 119 | from computer_control_mcp.gui import main as run_gui 120 | run_gui() 121 | 122 | else: 123 | # When no command is specified, run the server by default 124 | print("MCP server started!") 125 | run_server() 126 | 127 | if __name__ == "__main__": 128 | main() 129 | ``` -------------------------------------------------------------------------------- /tests/test_computer_control.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Tests for the Computer Control MCP package. 3 | """ 4 | 5 | import pytest 6 | from unittest.mock import Mock, patch 7 | import json 8 | import sys 9 | import tkinter as tk 10 | from tkinter import ttk 11 | import asyncio 12 | import os 13 | import ast 14 | from computer_control_mcp.core import mcp 15 | 16 | # Helper function to print request/response JSON, skipping non-serializable properties 17 | def print_json_data(name, request_data=None, response_data=None): 18 | def serialize(obj): 19 | try: 20 | json.dumps(obj) 21 | return obj 22 | except (TypeError, OverflowError): 23 | return str(obj) 24 | 25 | print(f"\n===== TEST: {name} =====", file=sys.stderr) 26 | if isinstance(request_data, dict): 27 | serializable_request = {k: serialize(v) for k, v in request_data.items()} 28 | print(f"REQUEST: {json.dumps(serializable_request, indent=2)}", file=sys.stderr) 29 | elif request_data is not None: 30 | print(f"REQUEST: {serialize(request_data)}", file=sys.stderr) 31 | if response_data is not None: 32 | if isinstance(response_data, dict): 33 | serializable_response = {k: serialize(v) for k, v in response_data.items()} 34 | print( 35 | f"RESPONSE: {json.dumps(serializable_response, indent=2)}", 36 | file=sys.stderr, 37 | ) 38 | else: 39 | print(f"RESPONSE: {serialize(response_data)}", file=sys.stderr) 40 | print("======================\n", file=sys.stderr) 41 | 42 | 43 | # Test drag_mouse tool 44 | @pytest.mark.asyncio 45 | async def test_drag_mouse(): 46 | # Test data 47 | test_window = tk.Tk() 48 | test_window.title("Test Drag Mouse") 49 | test_window.geometry("400x400") 50 | 51 | # Update the window to ensure coordinates are calculated 52 | test_window.update_idletasks() 53 | test_window.update() 54 | 55 | # Window title coordinates 56 | window_x = test_window.winfo_x() 57 | window_y = test_window.winfo_y() 58 | 59 | screen_width = test_window.winfo_screenwidth() 60 | screen_height = test_window.winfo_screenheight() 61 | center_x = screen_width // 2 62 | center_y = screen_height // 2 63 | request_data = { 64 | "from_x": window_x + 55, 65 | "from_y": window_y + 15, 66 | "to_x": center_x, 67 | "to_y": center_y, 68 | "duration": 1.0, 69 | } 70 | 71 | print(f"starting coordinates: x={window_x}, y={window_y}", file=sys.stderr) 72 | 73 | # Create an event to track completion 74 | drag_complete = asyncio.Event() 75 | 76 | async def perform_drag(): 77 | try: 78 | result = await mcp.call_tool("drag_mouse", request_data) 79 | print(f"Result: {result}", file=sys.stderr) 80 | finally: 81 | drag_complete.set() 82 | 83 | # Start the drag operation 84 | drag_task = asyncio.create_task(perform_drag()) 85 | 86 | # Keep updating the window while waiting for drag to complete 87 | while not drag_complete.is_set(): 88 | test_window.update() 89 | await asyncio.sleep(0.01) # Small delay to prevent high CPU usage 90 | 91 | # Wait for drag operation to complete 92 | await drag_task 93 | 94 | window_x_end = test_window.winfo_x() 95 | window_y_end = test_window.winfo_y() 96 | print(f'ending coordinates: x={window_x_end}, y={window_y_end}', file=sys.stderr) 97 | 98 | assert window_y_end != window_y and window_x_end != window_x 99 | 100 | test_window.destroy() 101 | 102 | 103 | # Test list_windows tool 104 | @pytest.mark.asyncio 105 | async def test_list_windows(): 106 | # open tkinter 107 | test_window = tk.Tk() 108 | test_window.title("Test Window") 109 | test_window.geometry("400x400") 110 | 111 | # Update the window to ensure coordinates are calculated 112 | test_window.update_idletasks() 113 | test_window.update() 114 | 115 | # list all windows 116 | result = await mcp.call_tool("list_windows", {}) 117 | 118 | # check if "Test Window" is in the list 119 | # Parse the TextContent objects to extract the JSON data 120 | window_data = [] 121 | for item in result: 122 | if hasattr(item, 'text'): 123 | try: 124 | window_info = json.loads(item.text) 125 | window_data.append(window_info) 126 | except json.JSONDecodeError: 127 | print(f"Failed to parse JSON: {item.text}", file=sys.stderr) 128 | 129 | print(f"Result: {window_data}") 130 | 131 | assert any(window.get("title") == "Test Window" for window in window_data) 132 | 133 | test_window.destroy() 134 | 135 | # Test screenshot with downloads 136 | @pytest.mark.asyncio 137 | async def test_take_screenshot(): 138 | # Take a screenshot of the whole screen and save to downloads 139 | results = await mcp.call_tool("take_screenshot", {'save_to_downloads': True, 'mode': 'whole_screen'}) 140 | 141 | for result in results: 142 | # Check if file_path is in the result 143 | if hasattr(result, 'text'): 144 | try: 145 | result_dict = json.loads(result.text) 146 | print(f"Screenshot result: {result_dict['title']}", file=sys.stderr) 147 | assert 'file_path' in result_dict, "file_path should be in the result" 148 | file_path = result_dict['file_path'] 149 | 150 | # Check if the file exists 151 | assert os.path.exists(file_path), f"File {file_path} should exist" 152 | print(f"Screenshot saved to: {file_path}", file=sys.stderr) 153 | 154 | # Clean up - remove the file 155 | os.remove(file_path) 156 | print(f"Removed test file: {file_path}", file=sys.stderr) 157 | except (ValueError, SyntaxError, AttributeError) as e: 158 | print(f"Error processing result: {e}", file=sys.stderr) 159 | assert False, f"Error processing result: {e}" 160 | 161 | assert True, "Successfully tested screenshot with downloads" 162 | ``` -------------------------------------------------------------------------------- /src/computer_control_mcp/test.py: -------------------------------------------------------------------------------- ```python 1 | import shutil 2 | import sys 3 | import os 4 | from typing import Dict, Any, List, Optional, Tuple 5 | from io import BytesIO 6 | import re 7 | import asyncio 8 | import uuid 9 | import datetime 10 | from pathlib import Path 11 | import tempfile 12 | 13 | # --- Auto-install dependencies if needed --- 14 | import pyautogui 15 | from mcp.server.fastmcp import FastMCP, Image 16 | import mss 17 | from PIL import Image as PILImage 18 | import pygetwindow as gw 19 | from fuzzywuzzy import fuzz, process 20 | 21 | import cv2 22 | from rapidocr_onnxruntime import RapidOCR, VisRes 23 | 24 | 25 | def log(message: str) -> None: 26 | """Log a message to stderr.""" 27 | print(f"STDOUT: {message}", file=sys.stderr) 28 | 29 | 30 | def get_downloads_dir() -> Path: 31 | """Get the OS downloads directory.""" 32 | if os.name == "nt": # Windows 33 | import winreg 34 | 35 | sub_key = r"SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders" 36 | downloads_guid = "{374DE290-123F-4565-9164-39C4925E467B}" 37 | with winreg.OpenKey(winreg.HKEY_CURRENT_USER, sub_key) as key: 38 | downloads_dir = winreg.QueryValueEx(key, downloads_guid)[0] 39 | return Path(downloads_dir) 40 | else: # macOS, Linux, etc. 41 | return Path.home() / "Downloads" 42 | 43 | 44 | def _mss_screenshot(region=None): 45 | """Take a screenshot using mss and return PIL Image. 46 | 47 | Args: 48 | region: Optional tuple (left, top, width, height) for region capture 49 | 50 | Returns: 51 | PIL Image object 52 | """ 53 | with mss.mss() as sct: 54 | if region is None: 55 | # Full screen screenshot 56 | monitor = sct.monitors[0] # All monitors combined 57 | else: 58 | # Region screenshot 59 | left, top, width, height = region 60 | monitor = { 61 | "left": left, 62 | "top": top, 63 | "width": width, 64 | "height": height, 65 | } 66 | 67 | screenshot = sct.grab(monitor) 68 | # Convert to PIL Image 69 | return PILImage.frombytes("RGB", screenshot.size, screenshot.bgra, "raw", "BGRX") 70 | 71 | 72 | def save_image_to_downloads( 73 | image, prefix: str = "screenshot", directory: Path = None 74 | ) -> Tuple[str, bytes]: 75 | """Save an image to the downloads directory and return its absolute path. 76 | 77 | Args: 78 | image: Either a PIL Image object or MCP Image object 79 | prefix: Prefix for the filename (default: 'screenshot') 80 | directory: Optional directory to save the image to 81 | 82 | Returns: 83 | Tuple of (absolute_path, image_data_bytes) 84 | """ 85 | # Create a unique filename with timestamp 86 | timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 87 | unique_id = str(uuid.uuid4())[:8] 88 | filename = f"{prefix}_{timestamp}_{unique_id}.png" 89 | 90 | # Get downloads directory 91 | downloads_dir = directory or get_downloads_dir() 92 | filepath = downloads_dir / filename 93 | 94 | # Handle different image types 95 | if hasattr(image, "save"): # PIL Image 96 | image.save(filepath) 97 | # Also get the bytes for returning 98 | img_byte_arr = BytesIO() 99 | image.save(img_byte_arr, format="PNG") 100 | img_bytes = img_byte_arr.getvalue() 101 | elif hasattr(image, "data"): # MCP Image 102 | img_bytes = image.data 103 | with open(filepath, "wb") as f: 104 | f.write(img_bytes) 105 | else: 106 | raise TypeError("Unsupported image type") 107 | 108 | log(f"Saved image to {filepath}") 109 | return str(filepath.absolute()), img_bytes 110 | 111 | 112 | def _find_matching_window( 113 | windows: any, 114 | title_pattern: str = None, 115 | use_regex: bool = False, 116 | threshold: int = 60, 117 | ) -> Optional[Dict[str, Any]]: 118 | """Helper function to find a matching window based on title pattern. 119 | 120 | Args: 121 | windows: List of window dictionaries 122 | title_pattern: Pattern to match window title 123 | use_regex: If True, treat the pattern as a regex, otherwise use fuzzy matching 124 | threshold: Minimum score (0-100) required for a fuzzy match 125 | 126 | Returns: 127 | The best matching window or None if no match found 128 | """ 129 | if not title_pattern: 130 | log("No title pattern provided, returning None") 131 | return None 132 | 133 | # For regex matching 134 | if use_regex: 135 | for window in windows: 136 | if re.search(title_pattern, window["title"], re.IGNORECASE): 137 | log(f"Regex match found: {window['title']}") 138 | return window 139 | return None 140 | 141 | # For fuzzy matching using fuzzywuzzy 142 | # Extract all window titles 143 | window_titles = [window["title"] for window in windows] 144 | 145 | # Use process.extractOne to find the best match 146 | best_match_title, score = process.extractOne( 147 | title_pattern, window_titles, scorer=fuzz.partial_ratio 148 | ) 149 | log(f"Best fuzzy match: '{best_match_title}' with score {score}") 150 | 151 | # Only return if the score is above the threshold 152 | if score >= threshold: 153 | # Find the window with the matching title 154 | for window in windows: 155 | if window["title"] == best_match_title: 156 | return window 157 | 158 | return None 159 | 160 | 161 | def take_screenshot( 162 | title_pattern: str = None, 163 | use_regex: bool = False, 164 | threshold: int = 60, 165 | save_to_downloads: bool = False, 166 | ) -> Image: 167 | """ 168 | Take screenshots based on the specified title pattern and save them to the downloads directory with absolute paths returned. 169 | If no title pattern is provided, take screenshot of entire screen. 170 | 171 | Args: 172 | title_pattern: Pattern to match window title, if None, take screenshot of entire screen 173 | use_regex: If True, treat the pattern as a regex, otherwise best match with fuzzy matching 174 | save_to_downloads: If True, save the screenshot to the downloads directory and return the absolute path 175 | threshold: Minimum score (0-100) required for a fuzzy match 176 | 177 | Returns: 178 | Always returns a single screenshot as MCP Image object, content type image not supported means preview isnt supported but Image object is there. 179 | """ 180 | try: 181 | all_windows = gw.getAllWindows() 182 | 183 | # Convert to list of dictionaries for _find_matching_window 184 | windows = [] 185 | for window in all_windows: 186 | if window.title: # Only include windows with titles 187 | windows.append( 188 | { 189 | "title": window.title, 190 | "window_obj": window, # Store the actual window object 191 | } 192 | ) 193 | 194 | print(f"Found {len(windows)} windows") 195 | window = _find_matching_window(windows, title_pattern, use_regex, threshold) 196 | window = window["window_obj"] if window else None 197 | 198 | # Store the currently active window 199 | current_active_window = gw.getActiveWindow() 200 | 201 | # Take the screenshot 202 | if not window: 203 | print("No matching window found, taking screenshot of entire screen") 204 | screenshot = _mss_screenshot() 205 | else: 206 | print(f"Taking screenshot of window: {window.title}") 207 | # Activate the window and wait for it to be fully in focus 208 | window.activate() 209 | pyautogui.sleep(0.5) # Wait for 0.5 seconds to ensure window is active 210 | screenshot = _mss_screenshot( 211 | region=(window.left, window.top, window.width, window.height) 212 | ) 213 | # Restore the previously active window 214 | if current_active_window: 215 | current_active_window.activate() 216 | pyautogui.sleep(0.2) # Wait a bit to ensure previous window is restored 217 | 218 | # Create temp directory 219 | temp_dir = Path(tempfile.mkdtemp()) 220 | 221 | # Save screenshot and get filepath 222 | filepath, _ = save_image_to_downloads( 223 | screenshot, prefix="screenshot", directory=temp_dir 224 | ) 225 | 226 | # Create Image object from filepath 227 | image = Image(filepath) 228 | 229 | # Copy from temp to downloads 230 | if save_to_downloads: 231 | print("Copying screenshot from temp to downloads") 232 | shutil.copy(filepath, get_downloads_dir()) 233 | 234 | return image # MCP Image object 235 | 236 | except Exception as e: 237 | print(f"Error taking screenshot: {str(e)}") 238 | return f"Error taking screenshot: {str(e)}" 239 | 240 | 241 | def get_ocr_from_screenshot( 242 | title_pattern: str = None, 243 | use_regex: bool = False, 244 | threshold: int = 60, 245 | scale_percent: int = 100, 246 | ) -> any: 247 | """ 248 | Get OCR text from the specified title pattern and save them to the downloads directory with absolute paths returned. 249 | If no title pattern is provided, get all Text on the screen. 250 | 251 | Args: 252 | title_pattern: Pattern to match window title, if None, get all UI elements on the screen 253 | use_regex: If True, treat the pattern as a regex, otherwise best match with fuzzy matching 254 | save_to_downloads: If True, save the screenshot to the downloads directory and return the absolute path 255 | threshold: Minimum score (0-100) required for a fuzzy match 256 | 257 | Returns: 258 | List of UI elements as MCP Image objects 259 | """ 260 | try: 261 | 262 | all_windows = gw.getAllWindows() 263 | 264 | # Convert to list of dictionaries for _find_matching_window 265 | windows = [] 266 | for window in all_windows: 267 | if window.title: # Only include windows with titles 268 | windows.append( 269 | { 270 | "title": window.title, 271 | "window_obj": window, # Store the actual window object 272 | } 273 | ) 274 | 275 | log(f"Found {len(windows)} windows") 276 | window = _find_matching_window(windows, title_pattern, use_regex, threshold) 277 | window = window["window_obj"] if window else None 278 | 279 | # Store the currently active window 280 | current_active_window = gw.getActiveWindow() 281 | 282 | # Take the screenshot 283 | if not window: 284 | log("No matching window found, taking screenshot of entire screen") 285 | screenshot = _mss_screenshot() 286 | else: 287 | log(f"Taking screenshot of window: {window.title}") 288 | # Activate the window and wait for it to be fully in focus 289 | window.activate() 290 | pyautogui.sleep(0.5) # Wait for 0.5 seconds to ensure window is active 291 | screenshot = _mss_screenshot( 292 | region=(window.left, window.top, window.width, window.height) 293 | ) 294 | # Restore the previously active window 295 | if current_active_window: 296 | current_active_window.activate() 297 | pyautogui.sleep(0.2) # Wait a bit to ensure previous window is restored 298 | 299 | # Create temp directory 300 | temp_dir = Path(tempfile.mkdtemp()) 301 | 302 | # Save screenshot and get filepath 303 | filepath, _ = save_image_to_downloads( 304 | screenshot, prefix="screenshot", directory=temp_dir 305 | ) 306 | 307 | # Create Image object from filepath 308 | image = Image(filepath) 309 | 310 | # Copy from temp to downloads 311 | if False: 312 | log("Copying screenshot from temp to downloads") 313 | shutil.copy(filepath, get_downloads_dir()) 314 | 315 | image_path = image.path 316 | img = cv2.imread(image_path) 317 | 318 | # Lower down resolution before processing 319 | width = int(img.shape[1] * scale_percent / 100) 320 | height = int(img.shape[0] * scale_percent / 100) 321 | dim = (width, height) 322 | resized_img = cv2.resize(img, dim, interpolation=cv2.INTER_AREA) 323 | # save resized image to pwd 324 | # cv2.imwrite("resized_img.png", resized_img) 325 | engine = RapidOCR() 326 | vis = VisRes() 327 | 328 | result, elapse_list = engine(resized_img) 329 | boxes, txts, scores = list(zip(*result)) 330 | boxes = [[[x + window.left, y + window.top] for x, y in box] for box in boxes] 331 | zipped_results = list(zip(boxes, txts, scores)) 332 | 333 | return zipped_results 334 | 335 | except Exception as e: 336 | log(f"Error getting UI elements: {str(e)}") 337 | import traceback 338 | 339 | stack_trace = traceback.format_exc() 340 | log(f"Stack trace:\n{stack_trace}") 341 | return f"Error getting UI elements: {str(e)}\nStack trace:\n{stack_trace}" 342 | 343 | 344 | import json 345 | 346 | print(json.dumps(get_ocr_from_screenshot("chrome"))) 347 | ``` -------------------------------------------------------------------------------- /src/computer_control_mcp/core.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python3 2 | """ 3 | Computer Control MCP - Core Implementation 4 | A compact ModelContextProtocol server that provides computer control capabilities 5 | using PyAutoGUI for mouse/keyboard control. 6 | """ 7 | 8 | import json 9 | import shutil 10 | import sys 11 | import os 12 | from typing import Dict, Any, List, Optional, Tuple 13 | from io import BytesIO 14 | import re 15 | import asyncio 16 | import uuid 17 | import datetime 18 | from pathlib import Path 19 | import tempfile 20 | from typing import Union 21 | 22 | # --- Auto-install dependencies if needed --- 23 | import pyautogui 24 | from mcp.server.fastmcp import FastMCP, Image 25 | import mss 26 | from PIL import Image as PILImage 27 | 28 | try: 29 | import pywinctl as gw 30 | except (NotImplementedError, ImportError): 31 | import pygetwindow as gw 32 | from fuzzywuzzy import fuzz, process 33 | 34 | import cv2 35 | from rapidocr import RapidOCR 36 | 37 | from pydantic import BaseModel 38 | 39 | BaseModel.model_config = {"arbitrary_types_allowed": True} 40 | 41 | engine = RapidOCR() 42 | 43 | 44 | DEBUG = True # Set to False in production 45 | RELOAD_ENABLED = True # Set to False to disable auto-reload 46 | 47 | # Create FastMCP server instance at module level 48 | mcp = FastMCP("ComputerControlMCP") 49 | 50 | 51 | # Determine mode automatically 52 | IS_DEVELOPMENT = os.getenv("ENV") == "development" 53 | 54 | 55 | def log(message: str) -> None: 56 | """Log to stderr in dev, to stdout or file in production.""" 57 | if IS_DEVELOPMENT: 58 | # In dev, write to stderr 59 | print(f"[DEV] {message}", file=sys.stderr) 60 | else: 61 | # In production, write to stdout or a file 62 | print(f"[PROD] {message}", file=sys.stdout) 63 | # or append to a file: open("app.log", "a").write(message+"\n") 64 | 65 | 66 | def get_downloads_dir() -> Path: 67 | """Get the OS downloads directory.""" 68 | if os.name == "nt": # Windows 69 | import winreg 70 | 71 | sub_key = r"SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders" 72 | downloads_guid = "{374DE290-123F-4565-9164-39C4925E467B}" 73 | with winreg.OpenKey(winreg.HKEY_CURRENT_USER, sub_key) as key: 74 | downloads_dir = winreg.QueryValueEx(key, downloads_guid)[0] 75 | return Path(downloads_dir) 76 | else: # macOS, Linux, etc. 77 | return Path.home() / "Downloads" 78 | 79 | 80 | def _mss_screenshot(region=None): 81 | """Take a screenshot using mss and return PIL Image. 82 | 83 | Args: 84 | region: Optional tuple (left, top, width, height) for region capture 85 | 86 | Returns: 87 | PIL Image object 88 | """ 89 | with mss.mss() as sct: 90 | if region is None: 91 | # Full screen screenshot 92 | monitor = sct.monitors[0] # All monitors combined 93 | else: 94 | # Region screenshot 95 | left, top, width, height = region 96 | monitor = { 97 | "left": left, 98 | "top": top, 99 | "width": width, 100 | "height": height, 101 | } 102 | 103 | screenshot = sct.grab(monitor) 104 | # Convert to PIL Image 105 | return PILImage.frombytes( 106 | "RGB", screenshot.size, screenshot.bgra, "raw", "BGRX" 107 | ) 108 | 109 | 110 | def save_image_to_downloads( 111 | image, prefix: str = "screenshot", directory: Path = None 112 | ) -> Tuple[str, bytes]: 113 | """Save an image to the downloads directory and return its absolute path. 114 | 115 | Args: 116 | image: Either a PIL Image object or MCP Image object 117 | prefix: Prefix for the filename (default: 'screenshot') 118 | directory: Optional directory to save the image to 119 | 120 | Returns: 121 | Tuple of (absolute_path, image_data_bytes) 122 | """ 123 | # Create a unique filename with timestamp 124 | timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 125 | unique_id = str(uuid.uuid4())[:8] 126 | filename = f"{prefix}_{timestamp}_{unique_id}.png" 127 | 128 | # Get downloads directory 129 | downloads_dir = directory or get_downloads_dir() 130 | filepath = downloads_dir / filename 131 | 132 | # Handle different image types 133 | if hasattr(image, "save"): # PIL Image 134 | image.save(filepath) 135 | # Also get the bytes for returning 136 | img_byte_arr = BytesIO() 137 | image.save(img_byte_arr, format="PNG") 138 | img_bytes = img_byte_arr.getvalue() 139 | elif hasattr(image, "data"): # MCP Image 140 | img_bytes = image.data 141 | with open(filepath, "wb") as f: 142 | f.write(img_bytes) 143 | else: 144 | raise TypeError("Unsupported image type") 145 | 146 | log(f"Saved image to {filepath}") 147 | return str(filepath.absolute()), img_bytes 148 | 149 | 150 | def _find_matching_window( 151 | windows: any, 152 | title_pattern: str = None, 153 | use_regex: bool = False, 154 | threshold: int = 10, 155 | ) -> Optional[Dict[str, Any]]: 156 | """Helper function to find a matching window based on title pattern. 157 | 158 | Args: 159 | windows: List of window dictionaries 160 | title_pattern: Pattern to match window title 161 | use_regex: If True, treat the pattern as a regex, otherwise use fuzzy matching 162 | threshold: Minimum score (0-100) required for a fuzzy match 163 | 164 | Returns: 165 | The best matching window or None if no match found 166 | """ 167 | if not title_pattern: 168 | log("No title pattern provided, returning None") 169 | return None 170 | 171 | # For regex matching 172 | if use_regex: 173 | for window in windows: 174 | if re.search(title_pattern, window["title"], re.IGNORECASE): 175 | log(f"Regex match found: {window['title']}") 176 | return window 177 | return None 178 | 179 | # For fuzzy matching using fuzzywuzzy 180 | # Extract all window titles 181 | window_titles = [window["title"] for window in windows] 182 | 183 | # Use process.extractOne to find the best match 184 | best_match_title, score = process.extractOne( 185 | title_pattern, window_titles, scorer=fuzz.partial_ratio 186 | ) 187 | log(f"Best fuzzy match: '{best_match_title}' with score {score}") 188 | 189 | # Only return if the score is above the threshold 190 | if score >= threshold: 191 | # Find the window with the matching title 192 | for window in windows: 193 | if window["title"] == best_match_title: 194 | return window 195 | 196 | return None 197 | 198 | 199 | # --- MCP Function Handlers --- 200 | 201 | 202 | @mcp.tool() 203 | def click_screen(x: int, y: int) -> str: 204 | """Click at the specified screen coordinates.""" 205 | try: 206 | pyautogui.click(x=x, y=y) 207 | return f"Successfully clicked at coordinates ({x}, {y})" 208 | except Exception as e: 209 | return f"Error clicking at coordinates ({x}, {y}): {str(e)}" 210 | 211 | 212 | @mcp.tool() 213 | def get_screen_size() -> Dict[str, Any]: 214 | """Get the current screen resolution.""" 215 | try: 216 | width, height = pyautogui.size() 217 | return { 218 | "width": width, 219 | "height": height, 220 | "message": f"Screen size: {width}x{height}", 221 | } 222 | except Exception as e: 223 | return {"error": str(e), "message": f"Error getting screen size: {str(e)}"} 224 | 225 | 226 | @mcp.tool() 227 | def type_text(text: str) -> str: 228 | """Type the specified text at the current cursor position.""" 229 | try: 230 | pyautogui.typewrite(text) 231 | return f"Successfully typed text: {text}" 232 | except Exception as e: 233 | return f"Error typing text: {str(e)}" 234 | 235 | 236 | @mcp.tool() 237 | def take_screenshot( 238 | title_pattern: str = None, 239 | use_regex: bool = False, 240 | threshold: int = 10, 241 | scale_percent_for_ocr: int = None, 242 | save_to_downloads: bool = False, 243 | ) -> Image: 244 | """ 245 | Get screenshot Image as MCP Image object. If no title pattern is provided, get screenshot of entire screen and all text on the screen. 246 | 247 | Args: 248 | title_pattern: Pattern to match window title, if None, take screenshot of entire screen 249 | use_regex: If True, treat the pattern as a regex, otherwise best match with fuzzy matching 250 | threshold: Minimum score (0-100) required for a fuzzy match 251 | scale_percent_for_ocr: Percentage to scale the image down before processing, you wont need this most of the time unless your pc is extremely old or slow 252 | save_to_downloads: If True, save the screenshot to the downloads directory and return the absolute path 253 | 254 | Returns: 255 | Returns a single screenshot as MCP Image object. "content type image not supported" means preview isnt supported but Image object is there and returned successfully. 256 | """ 257 | try: 258 | all_windows = gw.getAllWindows() 259 | 260 | # Convert to list of dictionaries for _find_matching_window 261 | windows = [] 262 | for window in all_windows: 263 | if window.title: # Only include windows with titles 264 | windows.append( 265 | { 266 | "title": window.title, 267 | "window_obj": window, # Store the actual window object 268 | } 269 | ) 270 | 271 | log(f"Found {len(windows)} windows") 272 | window = _find_matching_window(windows, title_pattern, use_regex, threshold) 273 | window = window["window_obj"] if window else None 274 | 275 | import ctypes 276 | import time 277 | 278 | def force_activate(window): 279 | """Force a window to the foreground on Windows.""" 280 | try: 281 | hwnd = window._hWnd # pywinctl window handle 282 | 283 | # Restore if minimized 284 | if window.isMinimized: 285 | window.restore() 286 | time.sleep(0.1) 287 | 288 | # Bring to top and set foreground 289 | ctypes.windll.user32.SetForegroundWindow(hwnd) 290 | ctypes.windll.user32.BringWindowToTop(hwnd) 291 | window.activate() # fallback 292 | time.sleep(0.3) # wait for OS to update 293 | 294 | except Exception as e: 295 | print(f"Warning: Could not force window: {e}", file=sys.stderr) 296 | 297 | # Take the screenshot 298 | if not window: 299 | log("No matching window found, taking screenshot of entire screen") 300 | screenshot = _mss_screenshot() 301 | else: 302 | try: 303 | # Re-fetch window handle to ensure it's valid 304 | window = gw.getWindowsWithTitle(window.title)[0] 305 | current_active_window = gw.getActiveWindow() 306 | log(f"Taking screenshot of window: {window.title}") 307 | 308 | if sys.platform == "win32": 309 | force_activate(window) 310 | else: 311 | window.activate() 312 | pyautogui.sleep(0.5) # Give Windows time to focus 313 | 314 | screen_width, screen_height = pyautogui.size() 315 | 316 | screenshot = _mss_screenshot( 317 | region=( 318 | max(window.left, 0), 319 | max(window.top, 0), 320 | min(window.width, screen_width), 321 | min(window.height, screen_height), 322 | ) 323 | ) 324 | 325 | # Restore previously active window 326 | if current_active_window and current_active_window != window: 327 | try: 328 | if sys.platform == "win32": 329 | force_activate(current_active_window) 330 | else: 331 | current_active_window.activate() 332 | pyautogui.sleep(0.2) 333 | except Exception as e: 334 | log(f"Error restoring previous window: {str(e)}") 335 | except Exception as e: 336 | log(f"Error taking screenshot of window: {str(e)}") 337 | screenshot = _mss_screenshot() # fallback to full screen 338 | 339 | # Create temp directory 340 | temp_dir = Path(tempfile.mkdtemp()) 341 | 342 | # Save screenshot and get filepath 343 | filepath, _ = save_image_to_downloads( 344 | screenshot, prefix="screenshot", directory=temp_dir 345 | ) 346 | 347 | # Create Image object from filepath 348 | image = Image(filepath) 349 | 350 | if save_to_downloads: 351 | log("Copying screenshot from temp to downloads") 352 | shutil.copy(filepath, get_downloads_dir()) 353 | 354 | return image # MCP Image object 355 | 356 | except Exception as e: 357 | log(f"Error in screenshot or getting UI elements: {str(e)}") 358 | import traceback 359 | 360 | stack_trace = traceback.format_exc() 361 | log(f"Stack trace:\n{stack_trace}") 362 | return f"Error in screenshot or getting UI elements: {str(e)}\nStack trace:\n{stack_trace}" 363 | 364 | 365 | def is_low_spec_pc() -> bool: 366 | try: 367 | import psutil 368 | 369 | cpu_low = psutil.cpu_count(logical=False) < 4 370 | ram_low = psutil.virtual_memory().total < 8 * 1024**3 371 | return cpu_low or ram_low 372 | except Exception: 373 | # Fallback if psutil not available or info unavailable 374 | return False 375 | 376 | 377 | @mcp.tool() 378 | def take_screenshot_with_ocr( 379 | title_pattern: str = None, 380 | use_regex: bool = False, 381 | threshold: int = 10, 382 | scale_percent_for_ocr: int = None, 383 | save_to_downloads: bool = False, 384 | ) -> str: 385 | """ 386 | Get OCR text from screenshot with absolute coordinates as JSON string of List[Tuple[List[List[int]], str, float]] (returned after adding the window offset from true (0, 0) of screen to the OCR coordinates, so clicking is on-point. Recommended to click in the middle of OCR Box) and using confidence from window with the specified title pattern. If no title pattern is provided, get screenshot of entire screen and all text on the screen. Know that OCR takes around 20 seconds on an mid-spec pc at 1080p resolution. 387 | 388 | Args: 389 | title_pattern: Pattern to match window title, if None, take screenshot of entire screen 390 | use_regex: If True, treat the pattern as a regex, otherwise best match with fuzzy matching 391 | threshold: Minimum score (0-100) required for a fuzzy match 392 | scale_percent_for_ocr: Percentage to scale the image down before processing, you wont need this most of the time unless your pc is extremely old or slow 393 | save_to_downloads: If True, save the screenshot to the downloads directory and return the absolute path 394 | 395 | Returns: 396 | Returns a list of UI elements as List[Tuple[List[List[int]], str, float]] where each tuple is [[4 corners of box], text, confidence], "content type image not supported" means preview isnt supported but Image object is there. 397 | """ 398 | try: 399 | all_windows = gw.getAllWindows() 400 | 401 | # Convert to list of dictionaries for _find_matching_window 402 | windows = [] 403 | for window in all_windows: 404 | if window.title: # Only include windows with titles 405 | windows.append( 406 | { 407 | "title": window.title, 408 | "window_obj": window, # Store the actual window object 409 | } 410 | ) 411 | 412 | log(f"Found {len(windows)} windows") 413 | window = _find_matching_window(windows, title_pattern, use_regex, threshold) 414 | window = window["window_obj"] if window else None 415 | 416 | # Store the currently active window 417 | 418 | # Take the screenshot 419 | if not window: 420 | log("No matching window found, taking screenshot of entire screen") 421 | screenshot = _mss_screenshot() 422 | else: 423 | current_active_window = gw.getActiveWindow() 424 | log(f"Taking screenshot of window: {window.title}") 425 | # Activate the window and wait for it to be fully in focus 426 | try: 427 | window.activate() 428 | pyautogui.sleep(0.5) # Wait for 0.5 seconds to ensure window is active 429 | screenshot = _mss_screenshot( 430 | region=(window.left, window.top, window.width, window.height) 431 | ) 432 | # Restore the previously active window 433 | if current_active_window: 434 | try: 435 | current_active_window.activate() 436 | pyautogui.sleep( 437 | 0.2 438 | ) # Wait a bit to ensure previous window is restored 439 | except Exception as e: 440 | log(f"Error restoring previous window: {str(e)}") 441 | except Exception as e: 442 | log(f"Error taking screenshot of window: {str(e)}") 443 | return f"Error taking screenshot of window: {str(e)}" 444 | 445 | # Create temp directory 446 | temp_dir = Path(tempfile.mkdtemp()) 447 | 448 | # Save screenshot and get filepath 449 | filepath, _ = save_image_to_downloads( 450 | screenshot, prefix="screenshot", directory=temp_dir 451 | ) 452 | 453 | # Create Image object from filepath 454 | image = Image(filepath) 455 | 456 | # Copy from temp to downloads 457 | if save_to_downloads: 458 | log("Copying screenshot from temp to downloads") 459 | shutil.copy(filepath, get_downloads_dir()) 460 | 461 | image_path = image.path 462 | img = cv2.imread(image_path) 463 | 464 | if scale_percent_for_ocr is None: 465 | # Calculate percent to scale height to 360 pixels 466 | scale_percent_for_ocr = 100 # 360 / img.shape[0] * 100 467 | 468 | # Lower down resolution before processing 469 | width = int(img.shape[1] * scale_percent_for_ocr / 100) 470 | height = int(img.shape[0] * scale_percent_for_ocr / 100) 471 | dim = (width, height) 472 | resized_img = cv2.resize(img, dim, interpolation=cv2.INTER_AREA) 473 | # save resized image to pwd 474 | # cv2.imwrite("resized_img.png", resized_img) 475 | 476 | output = engine(resized_img) 477 | boxes = output.boxes 478 | txts = output.txts 479 | scores = output.scores 480 | zipped_results = list(zip(boxes, txts, scores)) 481 | zipped_results = [ 482 | ( 483 | box.tolist(), 484 | text, 485 | float(score), 486 | ) # convert np.array -> list, ensure score is float 487 | for box, text, score in zipped_results 488 | ] 489 | log(f"Found {len(zipped_results)} text items in OCR result.") 490 | log(f"First 5 items: {zipped_results[:5]}") 491 | return ( 492 | ",\n".join([str(item) for item in zipped_results]) 493 | if zipped_results 494 | else "No text found" 495 | ) 496 | 497 | except Exception as e: 498 | log(f"Error in screenshot or getting UI elements: {str(e)}") 499 | import traceback 500 | 501 | stack_trace = traceback.format_exc() 502 | log(f"Stack trace:\n{stack_trace}") 503 | return f"Error in screenshot or getting UI elements: {str(e)}\nStack trace:\n{stack_trace}" 504 | 505 | 506 | @mcp.tool() 507 | def move_mouse(x: int, y: int) -> str: 508 | """Move the mouse to the specified screen coordinates.""" 509 | try: 510 | pyautogui.moveTo(x=x, y=y) 511 | return f"Successfully moved mouse to coordinates ({x}, {y})" 512 | except Exception as e: 513 | return f"Error moving mouse to coordinates ({x}, {y}): {str(e)}" 514 | 515 | 516 | @mcp.tool() 517 | def mouse_down(button: str = "left") -> str: 518 | """Hold down a mouse button ('left', 'right', 'middle').""" 519 | try: 520 | pyautogui.mouseDown(button=button) 521 | return f"Held down {button} mouse button" 522 | except Exception as e: 523 | return f"Error holding {button} mouse button: {str(e)}" 524 | 525 | 526 | @mcp.tool() 527 | def mouse_up(button: str = "left") -> str: 528 | """Release a mouse button ('left', 'right', 'middle').""" 529 | try: 530 | pyautogui.mouseUp(button=button) 531 | return f"Released {button} mouse button" 532 | except Exception as e: 533 | return f"Error releasing {button} mouse button: {str(e)}" 534 | 535 | 536 | @mcp.tool() 537 | async def drag_mouse( 538 | from_x: int, from_y: int, to_x: int, to_y: int, duration: float = 0.5 539 | ) -> str: 540 | """ 541 | Drag the mouse from one position to another. 542 | 543 | Args: 544 | from_x: Starting X coordinate 545 | from_y: Starting Y coordinate 546 | to_x: Ending X coordinate 547 | to_y: Ending Y coordinate 548 | duration: Duration of the drag in seconds (default: 0.5) 549 | 550 | Returns: 551 | Success or error message 552 | """ 553 | try: 554 | # First move to the starting position 555 | pyautogui.moveTo(x=from_x, y=from_y) 556 | # Then drag to the destination 557 | log("starting drag") 558 | await asyncio.to_thread(pyautogui.dragTo, x=to_x, y=to_y, duration=duration) 559 | log("done drag") 560 | return f"Successfully dragged from ({from_x}, {from_y}) to ({to_x}, {to_y})" 561 | except Exception as e: 562 | return f"Error dragging from ({from_x}, {from_y}) to ({to_x}, {to_y}): {str(e)}" 563 | 564 | 565 | import pyautogui 566 | from typing import Union, List 567 | 568 | 569 | @mcp.tool() 570 | def key_down(key: str) -> str: 571 | """Hold down a specific keyboard key until released.""" 572 | try: 573 | pyautogui.keyDown(key) 574 | return f"Held down key: {key}" 575 | except Exception as e: 576 | return f"Error holding key {key}: {str(e)}" 577 | 578 | 579 | @mcp.tool() 580 | def key_up(key: str) -> str: 581 | """Release a specific keyboard key.""" 582 | try: 583 | pyautogui.keyUp(key) 584 | return f"Released key: {key}" 585 | except Exception as e: 586 | return f"Error releasing key {key}: {str(e)}" 587 | 588 | 589 | @mcp.tool() 590 | def press_keys(keys: Union[str, List[Union[str, List[str]]]]) -> str: 591 | """ 592 | Press keyboard keys. 593 | 594 | Args: 595 | keys: 596 | - Single key as string (e.g., "enter") 597 | - Sequence of keys as list (e.g., ["a", "b", "c"]) 598 | - Key combinations as nested list (e.g., [["ctrl", "c"], ["alt", "tab"]]) 599 | 600 | Examples: 601 | press_keys("enter") 602 | press_keys(["a", "b", "c"]) 603 | press_keys([["ctrl", "c"], ["alt", "tab"]]) 604 | """ 605 | try: 606 | if isinstance(keys, str): 607 | # Single key 608 | pyautogui.press(keys) 609 | return f"Pressed single key: {keys}" 610 | 611 | elif isinstance(keys, list): 612 | for item in keys: 613 | if isinstance(item, str): 614 | # Sequential key press 615 | pyautogui.press(item) 616 | elif isinstance(item, list): 617 | # Key combination (e.g., ctrl+c) 618 | pyautogui.hotkey(*item) 619 | else: 620 | return f"Invalid key format: {item}" 621 | return f"Successfully pressed keys sequence: {keys}" 622 | 623 | else: 624 | return "Invalid input: must be str or list" 625 | 626 | except Exception as e: 627 | return f"Error pressing keys {keys}: {str(e)}" 628 | 629 | 630 | @mcp.tool() 631 | def list_windows() -> List[Dict[str, Any]]: 632 | """List all open windows on the system.""" 633 | try: 634 | windows = gw.getAllWindows() 635 | result = [] 636 | for window in windows: 637 | if window.title: # Only include windows with titles 638 | result.append( 639 | { 640 | "title": window.title, 641 | "left": window.left, 642 | "top": window.top, 643 | "width": window.width, 644 | "height": window.height, 645 | "is_active": window.isActive, 646 | "is_visible": window.visible, 647 | "is_minimized": window.isMinimized, 648 | "is_maximized": window.isMaximized, 649 | # "screenshot": pyautogui.screenshot( 650 | # region=( 651 | # window.left, 652 | # window.top, 653 | # window.width, 654 | # window.height, 655 | # ) 656 | # ), 657 | } 658 | ) 659 | return result 660 | except Exception as e: 661 | log(f"Error listing windows: {str(e)}") 662 | return [{"error": str(e)}] 663 | 664 | 665 | @mcp.tool() 666 | def activate_window( 667 | title_pattern: str, use_regex: bool = False, threshold: int = 60 668 | ) -> str: 669 | """ 670 | Activate a window (bring it to the foreground) by matching its title. 671 | 672 | Args: 673 | title_pattern: Pattern to match window title 674 | use_regex: If True, treat the pattern as a regex, otherwise use fuzzy matching 675 | threshold: Minimum score (0-100) required for a fuzzy match 676 | 677 | Returns: 678 | Success or error message 679 | """ 680 | try: 681 | # Get all windows 682 | all_windows = gw.getAllWindows() 683 | 684 | # Convert to list of dictionaries for _find_matching_window 685 | windows = [] 686 | for window in all_windows: 687 | if window.title: # Only include windows with titles 688 | windows.append( 689 | { 690 | "title": window.title, 691 | "window_obj": window, # Store the actual window object 692 | } 693 | ) 694 | 695 | # Find matching window using our improved function 696 | matched_window_dict = _find_matching_window( 697 | windows, title_pattern, use_regex, threshold 698 | ) 699 | 700 | if not matched_window_dict: 701 | log(f"No window found matching pattern: {title_pattern}") 702 | return f"Error: No window found matching pattern: {title_pattern}" 703 | 704 | # Get the actual window object 705 | matched_window = matched_window_dict["window_obj"] 706 | 707 | # Activate the window 708 | matched_window.activate() 709 | 710 | return f"Successfully activated window: '{matched_window.title}'" 711 | except Exception as e: 712 | log(f"Error activating window: {str(e)}") 713 | return f"Error activating window: {str(e)}" 714 | 715 | 716 | def main(): 717 | """Main entry point for the MCP server.""" 718 | pyautogui.FAILSAFE = True 719 | 720 | try: 721 | # Run the server 722 | log("Computer Control MCP Server Started...") 723 | mcp.run() 724 | 725 | except KeyboardInterrupt: 726 | log("Server shutting down...") 727 | except Exception as e: 728 | log(f"Error: {str(e)}") 729 | 730 | 731 | if __name__ == "__main__": 732 | main() 733 | ```