# Directory Structure ``` ├── .gitignore ├── .python-version ├── assets │ └── web-ui.png ├── Dockerfile ├── LICENSE ├── pyproject.toml ├── README.md ├── smithery.yaml ├── src │ └── mcp_server_browser_use │ ├── __init__.py │ ├── agent │ │ ├── __init__.py │ │ ├── custom_agent.py │ │ ├── custom_massage_manager.py │ │ ├── custom_prompts.py │ │ └── custom_views.py │ ├── browser │ │ ├── __init__.py │ │ ├── custom_browser.py │ │ └── custom_context.py │ ├── controller │ │ ├── __init__.py │ │ └── custom_controller.py │ ├── server.py │ └── utils │ ├── __init__.py │ ├── agent_state.py │ ├── deep_research.py │ ├── llm.py │ └── utils.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.11 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # File created using '.gitignore Generator' for Visual Studio Code: https://bit.ly/vscode-gig # Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,python # Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,macos,python ### macOS ### # General .DS_Store .AppleDouble .LSOverride # Icon must end with two \r Icon # Thumbnails ._* # Files that might appear in the root of a volume .DocumentRevisions-V100 .fseventsd .Spotlight-V100 .TemporaryItems .Trashes .VolumeIcon.icns .com.apple.timemachine.donotpresent # Directories potentially created on remote AFP share .AppleDB .AppleDesktop Network Trash Folder Temporary Items .apdisk ### macOS Patch ### # iCloud generated files *.icloud ### Python ### # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class # C extensions *.so # Distribution / packaging .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec # Installer logs pip-log.txt pip-delete-this-directory.txt # Unit test / coverage reports htmlcov/ .tox/ .nox/ .coverage .coverage.* .cache nosetests.xml coverage.xml *.cover *.py,cover .hypothesis/ .pytest_cache/ cover/ # Translations *.mo *.pot # Django stuff: *.log local_settings.py db.sqlite3 db.sqlite3-journal # Flask stuff: instance/ .webassets-cache # Scrapy stuff: .scrapy # Sphinx documentation docs/_build/ # PyBuilder .pybuilder/ target/ # Jupyter Notebook .ipynb_checkpoints # IPython profile_default/ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: # .python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. #Pipfile.lock # poetry # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control #poetry.lock # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. #pdm.lock # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/#use-with-ide .pdm.toml # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ # Celery stuff celerybeat-schedule celerybeat.pid # SageMath parsed files *.sage.py # Environments .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # Spyder project settings .spyderproject .spyproject # Rope project settings .ropeproject # mkdocs documentation /site # mypy .mypy_cache/ .dmypy.json dmypy.json # Pyre type checker .pyre/ # pytype static type analyzer .pytype/ # Cython debug symbols cython_debug/ # PyCharm # JetBrains specific template is maintained in a separate JetBrains.gitignore that can # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ ### Python Patch ### # Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration poetry.toml # ruff .ruff_cache/ # LSP config files pyrightconfig.json ### VisualStudioCode ### .vscode/* !.vscode/settings.json !.vscode/tasks.json !.vscode/launch.json !.vscode/extensions.json !.vscode/*.code-snippets # Local History for Visual Studio Code .history/ # Built Visual Studio Code Extensions *.vsix ### VisualStudioCode Patch ### # Ignore all local history of files .history .ionide # End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,python # Custom rules (everything added below won't be overriden by 'Generate .gitignore File' if you use 'Update' option) agent_history.gif trace.json recording.mp4 ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown <img src="./assets/web-ui.png" alt="Browser Use Web UI" width="full"/> <br/> # browser-use MCP server [](https://docs.browser-use.com) [](LICENSE) > **Project Note**: This MCP server implementation builds upon the [browser-use/web-ui](https://github.com/browser-use/web-ui) foundation. Core browser automation logic and configuration patterns are adapted from the original project. AI-driven browser automation server implementing the Model Context Protocol (MCP) for natural language browser control. <a href="https://glama.ai/mcp/servers/dz6dy5hw59"><img width="380" height="200" src="https://glama.ai/mcp/servers/dz6dy5hw59/badge" alt="Browser-Use Server MCP server" /></a> ## Features - 🧠 **MCP Integration** - Full protocol implementation for AI agent communication - 🌐 **Browser Automation** - Page navigation, form filling, and element interaction - 👁️ **Visual Understanding** - Screenshot analysis and vision-based interactions - 🔄 **State Persistence** - Maintain browser sessions between tasks - 🔌 **Multi-LLM Support** - OpenAI, Anthropic, Azure, DeepSeek integration ## Quick Start ### Prerequisites - Python 3.11 or higher - uv (fast Python package installer) - Chrome/Chromium browser ### Installation #### Claude Desktop On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json` On Windows: `%APPDATA%/Claude/claude_desktop_config.json` ```json "mcpServers": { "browser-use": { "command": "uvx", "args": [ "mcp-server-browser-use", ], "env": { "OPENROUTER_API_KEY": "", "OPENROUTER_ENDPOINT": "https://openrouter.ai/api/v1", "OPENAI_ENDPOINT": "https://api.openai.com/v1", "OPENAI_API_KEY": "", "ANTHROPIC_ENDPOINT": "https://api.anthropic.com", "ANTHROPIC_API_KEY": "", "GOOGLE_API_KEY": "", "AZURE_OPENAI_ENDPOINT": "", "AZURE_OPENAI_API_KEY": "", "DEEPSEEK_ENDPOINT": "https://api.deepseek.com", "DEEPSEEK_API_KEY": "", "MISTRAL_API_KEY": "", "MISTRAL_ENDPOINT": "https://api.mistral.ai/v1", "OLLAMA_ENDPOINT": "http://localhost:11434", "ANONYMIZED_TELEMETRY": "true", "BROWSER_USE_LOGGING_LEVEL": "info", "CHROME_PATH": "", "CHROME_USER_DATA": "", "CHROME_DEBUGGING_PORT": "9222", "CHROME_DEBUGGING_HOST": "localhost", "CHROME_PERSISTENT_SESSION": "false", "BROWSER_HEADLESS": "false", "BROWSER_DISABLE_SECURITY": "false", "BROWSER_WINDOW_WIDTH": "1280", "BROWSER_WINDOW_HEIGHT": "720", "BROWSER_TRACE_PATH": "trace.json", "BROWSER_RECORDING_PATH": "recording.mp4", "RESOLUTION": "1920x1080x24", "RESOLUTION_WIDTH": "1920", "RESOLUTION_HEIGHT": "1080", "VNC_PASSWORD": "youvncpassword", "MCP_MODEL_PROVIDER": "anthropic", "MCP_MODEL_NAME": "claude-3-5-sonnet-20241022", "MCP_TEMPERATURE": "0.3", "MCP_MAX_STEPS": "30", "MCP_USE_VISION": "true", "MCP_MAX_ACTIONS_PER_STEP": "5", "MCP_TOOL_CALL_IN_CONTENT": "true" } } ``` ### Local Development ```json "browser-use": { "command": "uv", "args": [ "--directory", "/path/to/mcp-browser-use", "run", "mcp-server-browser-use" ], "env": { ... } } ``` ## Development ```bash # Install dev dependencies uv sync # Run with debugger npx @modelcontextprotocol/inspector uv --directory . run mcp-server-browser-use ``` ## Troubleshooting - **Browser Conflicts**: Close all Chrome instances before starting. - **API Errors**: Verify API keys in environment variables match your LLM provider. - **Vision Support**: Ensure `MCP_USE_VISION=true` for screenshot analysis. ## Provider Configuration The server supports multiple LLM providers through environment variables. Here are the available options for `MCP_MODEL_PROVIDER`: | Provider | Value | Required Env Variables | |----------|--------|----------------------| | Anthropic | `anthropic` | `ANTHROPIC_API_KEY`<br>`ANTHROPIC_ENDPOINT` (optional) | | OpenAI | `openai` | `OPENAI_API_KEY`<br>`OPENAI_ENDPOINT` (optional) | | Azure OpenAI | `azure_openai` | `AZURE_OPENAI_API_KEY`<br>`AZURE_OPENAI_ENDPOINT` | | DeepSeek | `deepseek` | `DEEPSEEK_API_KEY`<br>`DEEPSEEK_ENDPOINT` (optional) | | Gemini | `gemini` | `GOOGLE_API_KEY` | | Mistral | `mistral` | `MISTRAL_API_KEY`<br>`MISTRAL_ENDPOINT` (optional) | | Ollama | `ollama` | `OLLAMA_ENDPOINT` (optional, defaults to localhost:11434) | | OpenRouter | `openrouter` | `OPENROUTER_API_KEY`<br>`OPENROUTER_ENDPOINT` (optional) | ### Notes: - For endpoints marked as optional, default values will be used if not specified - Temperature can be configured using `MCP_TEMPERATURE` (default: 0.3) - Model can be specified using `MCP_MODEL_NAME` - For Ollama models, additional context settings like `num_ctx` and `num_predict` are configurable ## Credits This project extends the [browser-use/web-ui](https://github.com/browser-use/web-ui) under MIT License. Special thanks to the original authors for their browser automation framework. ## License MIT - See [LICENSE](LICENSE) for details. ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/agent/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/browser/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/controller/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/utils/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/__init__.py: -------------------------------------------------------------------------------- ```python """MCP server for browser-use""" from mcp_server_browser_use.server import app, main __all__ = ["app", "main"] ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/browser/custom_context.py: -------------------------------------------------------------------------------- ```python import json import logging import os from browser_use.browser.browser import Browser from browser_use.browser.context import BrowserContext, BrowserContextConfig from playwright.async_api import Browser as PlaywrightBrowser from playwright.async_api import BrowserContext as PlaywrightBrowserContext logger = logging.getLogger(__name__) class CustomBrowserContext(BrowserContext): def __init__( self, browser: "Browser", config: BrowserContextConfig = BrowserContextConfig() ): super(CustomBrowserContext, self).__init__(browser=browser, config=config) ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/utils/agent_state.py: -------------------------------------------------------------------------------- ```python import asyncio class AgentState: _instance = None def __init__(self): if not hasattr(self, '_stop_requested'): self._stop_requested = asyncio.Event() self.last_valid_state = None # store the last valid browser state def __new__(cls): if cls._instance is None: cls._instance = super(AgentState, cls).__new__(cls) return cls._instance def request_stop(self): self._stop_requested.set() def clear_stop(self): self._stop_requested.clear() self.last_valid_state = None def is_stop_requested(self): return self._stop_requested.is_set() def set_last_valid_state(self, state): self.last_valid_state = state def get_last_valid_state(self): return self.last_valid_state ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml startCommand: type: stdio configSchema: type: object required: - OPENROUTER_API_KEY properties: OPENROUTER_API_KEY: type: string description: "The API key for OpenRouter." MCP_MODEL_NAME: type: string description: "The model to use on OpenRouter (default: openai/o3-mini-high)." BROWSER_HEADLESS: type: string description: "Set to 'true' to run the browser in headless mode (default: 'false')." commandFunction: |- (config) => ({ command: 'mcp-server-browser-use', args: [], env: { OPENROUTER_API_KEY: config.OPENROUTER_API_KEY || '', MCP_MODEL_NAME: config.MCP_MODEL_NAME || 'openai/o3-mini-high', BROWSER_HEADLESS: config.BROWSER_HEADLESS || 'false', PORT: '8000' } }) ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Use the official Python 3.13 slim image as base FROM python:3.13-slim # Install system dependencies: Chromium, its driver, Xvfb for headless operation, and ca-certificates RUN apt-get update && \ apt-get install -y --no-install-recommends \ chromium \ chromium-driver \ xvfb \ ca-certificates && \ rm -rf /var/lib/apt/lists/* # Set the working directory WORKDIR /app # Copy project files into the container COPY . /app # Install uv (fast Python package installer) using pip RUN pip install --upgrade pip && pip install uv # Install project dependencies using uv's pip command RUN pip install . # Expose the port that the server will listen on (likely 8000) EXPOSE 8000 # Set environment variables as needed (adjust as necessary) ENV BROWSER_HEADLESS="true" \ BROWSER_USE_LOGGING_LEVEL="info" RUN playwright install # Set the entrypoint to the command that Smithery expects ENTRYPOINT ["uv", "run", "src/mcp_server_browser_use/server.py"] # Default command if no arguments are provided CMD ["--help"] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "mcp_server_browser_use" version = "0.1.3" description = "MCP server for browser-use" readme = "README.md" requires-python = ">=3.11" authors = [ { name = "Igor Tarasenko" }, { name = "Martin Jakobsson" }, ] license = { text = "MIT" } classifiers = [ "Development Status :: 4 - Beta", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.11", "Operating System :: OS Independent", ] dependencies = [ "langchain>=0.3.14", "langchain-openai>=0.2.14", "pydantic>=2.10.5", "fastapi>=0.115.6", "uvicorn>=0.22.0", "openai>=1.59.5", "python-dotenv>=1.0.1", "pyperclip>=1.9.0", "langchain-ollama>=0.2.2", "instructor>=1.7.2", "json-repair>=0.35.0", "langchain-mistralai>=0.2.6", "fastmcp>=0.4.1", "browser-use==0.1.29", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/mcp_server_browser_use"] [project.scripts] mcp-server-browser-use = "mcp_server_browser_use.server:main" ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/agent/custom_views.py: -------------------------------------------------------------------------------- ```python from dataclasses import dataclass from typing import Type from browser_use.agent.views import AgentOutput from browser_use.controller.registry.views import ActionModel from pydantic import BaseModel, ConfigDict, Field, create_model @dataclass class CustomAgentStepInfo: step_number: int max_steps: int task: str add_infos: str memory: str task_progress: str future_plans: str class CustomAgentBrain(BaseModel): """Current state of the agent""" prev_action_evaluation: str important_contents: str task_progress: str future_plans: str thought: str summary: str class CustomAgentOutput(AgentOutput): """Output model for agent @dev note: this model is extended with custom actions in AgentService. You can also use some fields that are not in this model as provided by the linter, as long as they are registered in the DynamicActions model. """ model_config = ConfigDict(arbitrary_types_allowed=True) current_state: CustomAgentBrain action: list[ActionModel] @staticmethod def type_with_custom_actions( custom_actions: Type[ActionModel], ) -> Type["CustomAgentOutput"]: """Extend actions with custom actions""" return create_model( "CustomAgentOutput", __base__=CustomAgentOutput, action=( list[custom_actions], Field(...), ), # Properly annotated field with no default __module__=CustomAgentOutput.__module__, ) ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/controller/custom_controller.py: -------------------------------------------------------------------------------- ```python import pdb import pyperclip from typing import Optional, Type from pydantic import BaseModel from browser_use.agent.views import ActionResult from browser_use.browser.context import BrowserContext from browser_use.controller.service import Controller, DoneAction from main_content_extractor import MainContentExtractor from browser_use.controller.views import ( ClickElementAction, DoneAction, ExtractPageContentAction, GoToUrlAction, InputTextAction, OpenTabAction, ScrollAction, SearchGoogleAction, SendKeysAction, SwitchTabAction, ) import logging logger = logging.getLogger(__name__) class CustomController(Controller): def __init__(self, exclude_actions: list[str] = [], output_model: Optional[Type[BaseModel]] = None ): super().__init__(exclude_actions=exclude_actions, output_model=output_model) self._register_custom_actions() def _register_custom_actions(self): """Register all custom browser actions""" @self.registry.action("Copy text to clipboard") def copy_to_clipboard(text: str): pyperclip.copy(text) return ActionResult(extracted_content=text) @self.registry.action("Paste text from clipboard", requires_browser=True) async def paste_from_clipboard(browser: BrowserContext): text = pyperclip.paste() # send text to browser page = await browser.get_current_page() await page.keyboard.type(text) return ActionResult(extracted_content=text) @self.registry.action( 'Extract page content to get the pure text or markdown with links if include_links is set to true', param_model=ExtractPageContentAction, requires_browser=True, ) async def extract_content(params: ExtractPageContentAction, browser: BrowserContext): page = await browser.get_current_page() # use jina reader url = page.url jina_url = f"https://r.jina.ai/{url}" await page.goto(jina_url) output_format = 'markdown' if params.include_links else 'text' content = MainContentExtractor.extract( # type: ignore html=await page.content(), output_format=output_format, ) # go back to org url await page.go_back() msg = f'📄 Extracted page content as {output_format}\n: {content}\n' logger.info(msg) return ActionResult(extracted_content=msg) ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/browser/custom_browser.py: -------------------------------------------------------------------------------- ```python import asyncio import pdb from playwright.async_api import Browser as PlaywrightBrowser from playwright.async_api import ( BrowserContext as PlaywrightBrowserContext, ) from playwright.async_api import ( Playwright, async_playwright, ) from browser_use.browser.browser import Browser from browser_use.browser.context import BrowserContext, BrowserContextConfig from playwright.async_api import BrowserContext as PlaywrightBrowserContext import logging from .custom_context import CustomBrowserContext logger = logging.getLogger(__name__) class CustomBrowser(Browser): async def new_context( self, config: BrowserContextConfig = BrowserContextConfig() ) -> CustomBrowserContext: return CustomBrowserContext(config=config, browser=self) async def _setup_browser_with_instance(self, playwright: Playwright) -> PlaywrightBrowser: """Sets up and returns a Playwright Browser instance with anti-detection measures.""" if not self.config.chrome_instance_path: raise ValueError('Chrome instance path is required') import subprocess import requests try: # Check if browser is already running response = requests.get('http://localhost:9222/json/version', timeout=2) if response.status_code == 200: logger.info('Reusing existing Chrome instance') browser = await playwright.chromium.connect_over_cdp( endpoint_url='http://localhost:9222', timeout=20000, # 20 second timeout for connection ) return browser except requests.ConnectionError: logger.debug('No existing Chrome instance found, starting a new one') # Start a new Chrome instance subprocess.Popen( [ self.config.chrome_instance_path, '--remote-debugging-port=9222', ] + self.config.extra_chromium_args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) # try to connect first in case the browser have not started for _ in range(10): try: response = requests.get('http://localhost:9222/json/version', timeout=2) if response.status_code == 200: break except requests.ConnectionError: pass await asyncio.sleep(1) # Attempt to connect again after starting a new instance try: browser = await playwright.chromium.connect_over_cdp( endpoint_url='http://localhost:9222', timeout=20000, # 20 second timeout for connection ) return browser except Exception as e: logger.error(f'Failed to start a new Chrome instance.: {str(e)}') raise RuntimeError( ' To start chrome in Debug mode, you need to close all existing Chrome instances and try again otherwise we can not connect to the instance.' ) ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/utils/llm.py: -------------------------------------------------------------------------------- ```python from openai import OpenAI import pdb from langchain_openai import ChatOpenAI from langchain_core.globals import get_llm_cache from langchain_core.language_models.base import ( BaseLanguageModel, LangSmithParams, LanguageModelInput, ) from langchain_core.load import dumpd, dumps from langchain_core.messages import ( AIMessage, SystemMessage, AnyMessage, BaseMessage, BaseMessageChunk, HumanMessage, convert_to_messages, message_chunk_to_message, ) from langchain_core.outputs import ( ChatGeneration, ChatGenerationChunk, ChatResult, LLMResult, RunInfo, ) from langchain_ollama import ChatOllama from langchain_core.output_parsers.base import OutputParserLike from langchain_core.runnables import Runnable, RunnableConfig from langchain_core.tools import BaseTool from typing import ( TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast, ) class DeepSeekR1ChatOpenAI(ChatOpenAI): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.client = OpenAI( base_url=kwargs.get("base_url"), api_key=kwargs.get("api_key") ) async def ainvoke( self, input: LanguageModelInput, config: Optional[RunnableConfig] = None, *, stop: Optional[list[str]] = None, **kwargs: Any, ) -> AIMessage: message_history = [] for input_ in input: if isinstance(input_, SystemMessage): message_history.append({"role": "system", "content": input_.content}) elif isinstance(input_, AIMessage): message_history.append({"role": "assistant", "content": input_.content}) else: message_history.append({"role": "user", "content": input_.content}) response = self.client.chat.completions.create( model=self.model_name, messages=message_history ) reasoning_content = response.choices[0].message.reasoning_content content = response.choices[0].message.content return AIMessage(content=content, reasoning_content=reasoning_content) def invoke( self, input: LanguageModelInput, config: Optional[RunnableConfig] = None, *, stop: Optional[list[str]] = None, **kwargs: Any, ) -> AIMessage: message_history = [] for input_ in input: if isinstance(input_, SystemMessage): message_history.append({"role": "system", "content": input_.content}) elif isinstance(input_, AIMessage): message_history.append({"role": "assistant", "content": input_.content}) else: message_history.append({"role": "user", "content": input_.content}) response = self.client.chat.completions.create( model=self.model_name, messages=message_history ) reasoning_content = response.choices[0].message.reasoning_content content = response.choices[0].message.content return AIMessage(content=content, reasoning_content=reasoning_content) class DeepSeekR1ChatOllama(ChatOllama): async def ainvoke( self, input: LanguageModelInput, config: Optional[RunnableConfig] = None, *, stop: Optional[list[str]] = None, **kwargs: Any, ) -> AIMessage: org_ai_message = await super().ainvoke(input=input) org_content = org_ai_message.content reasoning_content = org_content.split("</think>")[0].replace("<think>", "") content = org_content.split("</think>")[1] if "**JSON Response:**" in content: content = content.split("**JSON Response:**")[-1] return AIMessage(content=content, reasoning_content=reasoning_content) def invoke( self, input: LanguageModelInput, config: Optional[RunnableConfig] = None, *, stop: Optional[list[str]] = None, **kwargs: Any, ) -> AIMessage: org_ai_message = super().invoke(input=input) org_content = org_ai_message.content reasoning_content = org_content.split("</think>")[0].replace("<think>", "") content = org_content.split("</think>")[1] if "**JSON Response:**" in content: content = content.split("**JSON Response:**")[-1] return AIMessage(content=content, reasoning_content=reasoning_content) ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/agent/custom_massage_manager.py: -------------------------------------------------------------------------------- ```python from __future__ import annotations import logging from typing import List, Optional, Type from browser_use.agent.message_manager.service import MessageManager from browser_use.agent.message_manager.views import MessageHistory from browser_use.agent.prompts import SystemPrompt, AgentMessagePrompt from browser_use.agent.views import ActionResult, AgentStepInfo, ActionModel from browser_use.browser.views import BrowserState from langchain_core.language_models import BaseChatModel from langchain_anthropic import ChatAnthropic from langchain_core.language_models import BaseChatModel from langchain_core.messages import ( AIMessage, BaseMessage, HumanMessage, ToolMessage ) from langchain_openai import ChatOpenAI from ..utils.llm import DeepSeekR1ChatOpenAI from .custom_prompts import CustomAgentMessagePrompt logger = logging.getLogger(__name__) class CustomMassageManager(MessageManager): def __init__( self, llm: BaseChatModel, task: str, action_descriptions: str, system_prompt_class: Type[SystemPrompt], agent_prompt_class: Type[AgentMessagePrompt], max_input_tokens: int = 128000, estimated_characters_per_token: int = 3, image_tokens: int = 800, include_attributes: list[str] = [], max_error_length: int = 400, max_actions_per_step: int = 10, message_context: Optional[str] = None ): super().__init__( llm=llm, task=task, action_descriptions=action_descriptions, system_prompt_class=system_prompt_class, max_input_tokens=max_input_tokens, estimated_characters_per_token=estimated_characters_per_token, image_tokens=image_tokens, include_attributes=include_attributes, max_error_length=max_error_length, max_actions_per_step=max_actions_per_step, message_context=message_context ) self.agent_prompt_class = agent_prompt_class # Custom: Move Task info to state_message self.history = MessageHistory() self._add_message_with_tokens(self.system_prompt) if self.message_context: context_message = HumanMessage(content=self.message_context) self._add_message_with_tokens(context_message) def cut_messages(self): """Get current message list, potentially trimmed to max tokens""" diff = self.history.total_tokens - self.max_input_tokens min_message_len = 2 if self.message_context is not None else 1 while diff > 0 and len(self.history.messages) > min_message_len: self.history.remove_message(min_message_len) # always remove the oldest message diff = self.history.total_tokens - self.max_input_tokens def add_state_message( self, state: BrowserState, actions: Optional[List[ActionModel]] = None, result: Optional[List[ActionResult]] = None, step_info: Optional[AgentStepInfo] = None, ) -> None: """Add browser state as human message""" # otherwise add state message and result to next message (which will not stay in memory) state_message = self.agent_prompt_class( state, actions, result, include_attributes=self.include_attributes, max_error_length=self.max_error_length, step_info=step_info, ).get_user_message() self._add_message_with_tokens(state_message) def _count_text_tokens(self, text: str) -> int: if isinstance(self.llm, (ChatOpenAI, ChatAnthropic, DeepSeekR1ChatOpenAI)): try: tokens = self.llm.get_num_tokens(text) except Exception: tokens = ( len(text) // self.estimated_characters_per_token ) # Rough estimate if no tokenizer available else: tokens = ( len(text) // self.estimated_characters_per_token ) # Rough estimate if no tokenizer available return tokens def _remove_state_message_by_index(self, remove_ind=-1) -> None: """Remove last state message from history""" i = len(self.history.messages) - 1 remove_cnt = 0 while i >= 0: if isinstance(self.history.messages[i].message, HumanMessage): remove_cnt += 1 if remove_cnt == abs(remove_ind): self.history.remove_message(i) break i -= 1 ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/server.py: -------------------------------------------------------------------------------- ```python import asyncio import os import sys import traceback from typing import List, Optional import logging logging.getLogger().addHandler(logging.NullHandler()) logging.getLogger().propagate = False from mcp_server_browser_use.agent.custom_prompts import ( CustomAgentMessagePrompt, CustomSystemPrompt, ) from browser_use import BrowserConfig from browser_use.browser.context import BrowserContextConfig, BrowserContextWindowSize from fastmcp.server import FastMCP from mcp.types import TextContent from mcp_server_browser_use.agent.custom_agent import CustomAgent from mcp_server_browser_use.browser.custom_browser import CustomBrowser from mcp_server_browser_use.controller.custom_controller import CustomController from mcp_server_browser_use.utils import utils from mcp_server_browser_use.utils.agent_state import AgentState # Global references for single "running agent" approach _global_agent = None _global_browser = None _global_browser_context = None _global_agent_state = AgentState() app = FastMCP("mcp_server_browser_use") def get_env_bool(key: str, default: bool = False) -> bool: """Get boolean value from environment variable.""" return os.getenv(key, str(default)).lower() in ("true", "1", "yes") async def _safe_cleanup(): """Safely clean up browser resources""" global _global_browser, _global_agent_state, _global_browser_context, _global_agent try: if _global_agent_state: try: await _global_agent_state.request_stop() except Exception: pass if _global_browser_context: try: await _global_browser_context.close() except Exception: pass if _global_browser: try: await _global_browser.close() except Exception: pass except Exception as e: # Log the error, but don't re-raise print(f"Error during cleanup: {e}", file=sys.stderr) finally: # Reset global variables _global_browser = None _global_browser_context = None _global_agent_state = AgentState() _global_agent = None @app.tool() async def run_browser_agent(task: str, add_infos: str = "") -> str: """Handle run-browser-agent tool calls.""" global _global_agent, _global_browser, _global_browser_context, _global_agent_state try: # Clear any previous agent stop signals _global_agent_state.clear_stop() # Get browser configuration headless = get_env_bool("BROWSER_HEADLESS", True) disable_security = get_env_bool("BROWSER_DISABLE_SECURITY", False) window_w = int(os.getenv("BROWSER_WINDOW_WIDTH", "1280")) window_h = int(os.getenv("BROWSER_WINDOW_HEIGHT", "720")) # Get agent configuration model_provider = os.getenv("MCP_MODEL_PROVIDER", "openrouter") model_name = os.getenv("MCP_MODEL_NAME", "openai/o3-mini-high") temperature = float(os.getenv("MCP_TEMPERATURE", "0.7")) max_steps = int(os.getenv("MCP_MAX_STEPS", "100")) use_vision = get_env_bool("MCP_USE_VISION", True) max_actions_per_step = int(os.getenv("MCP_MAX_ACTIONS_PER_STEP", "5")) tool_calling_method = os.getenv("MCP_TOOL_CALLING_METHOD", "auto") # Configure browser window size extra_chromium_args = [f"--window-size={window_w},{window_h}"] # Initialize browser if needed if not _global_browser: _global_browser = CustomBrowser( config=BrowserConfig( headless=headless, disable_security=disable_security, extra_chromium_args=extra_chromium_args, ) ) # Initialize browser context if needed if not _global_browser_context: _global_browser_context = await _global_browser.new_context( config=BrowserContextConfig( trace_path=os.getenv("BROWSER_TRACE_PATH"), save_recording_path=os.getenv("BROWSER_RECORDING_PATH"), no_viewport=False, browser_window_size=BrowserContextWindowSize( width=window_w, height=window_h ), ) ) # Prepare LLM llm = utils.get_llm_model( provider=model_provider, model_name=model_name, temperature=temperature ) # Create controller and agent controller = CustomController() _global_agent = CustomAgent( task=task, add_infos=add_infos, use_vision=use_vision, llm=llm, browser=_global_browser, browser_context=_global_browser_context, controller=controller, system_prompt_class=CustomSystemPrompt, agent_prompt_class=CustomAgentMessagePrompt, max_actions_per_step=max_actions_per_step, agent_state=_global_agent_state, tool_calling_method=tool_calling_method, ) # Run agent with improved error handling try: history = await _global_agent.run(max_steps=max_steps) final_result = ( history.final_result() or f"No final result. Possibly incomplete. {history}" ) return final_result except asyncio.CancelledError: return "Task was cancelled" except Exception as e: logging.error(f"Agent run error: {str(e)}\n{traceback.format_exc()}") return f"Error during task execution: {str(e)}" except Exception as e: logging.error(f"run-browser-agent error: {str(e)}\n{traceback.format_exc()}") return f"Error during task execution: {str(e)}" finally: asyncio.create_task(_safe_cleanup()) def main(): app.run() if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/utils/utils.py: -------------------------------------------------------------------------------- ```python import base64 import os import time from pathlib import Path from typing import Dict, Optional import requests from langchain_anthropic import ChatAnthropic from langchain_mistralai import ChatMistralAI from langchain_google_genai import ChatGoogleGenerativeAI from langchain_ollama import ChatOllama from langchain_openai import AzureChatOpenAI, ChatOpenAI from .llm import DeepSeekR1ChatOpenAI, DeepSeekR1ChatOllama def get_llm_model(provider: str, **kwargs): """ 获取LLM 模型 :param provider: 模型类型 :param kwargs: :return: """ if provider not in ["ollama"]: env_var = ( "GOOGLE_API_KEY" if provider == "gemini" else f"{provider.upper()}_API_KEY" ) api_key = kwargs.get("api_key", "") or os.getenv(env_var, "") if not api_key: raise ValueError(f"API key for {provider} is not set") kwargs["api_key"] = api_key if provider == "anthropic": if not kwargs.get("base_url", ""): base_url = "https://api.anthropic.com" else: base_url = kwargs.get("base_url") return ChatAnthropic( model_name=kwargs.get("model_name", "claude-3-5-sonnet-20240620"), temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) elif provider == "mistral": if not kwargs.get("base_url", ""): base_url = os.getenv("MISTRAL_ENDPOINT", "https://api.mistral.ai/v1") else: base_url = kwargs.get("base_url") if not kwargs.get("api_key", ""): api_key = os.getenv("MISTRAL_API_KEY", "") else: api_key = kwargs.get("api_key") return ChatMistralAI( model=kwargs.get("model_name", "mistral-large-latest"), temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) elif provider == "openai": if not kwargs.get("base_url", ""): base_url = os.getenv("OPENAI_ENDPOINT", "https://api.openai.com/v1") else: base_url = kwargs.get("base_url") return ChatOpenAI( model=kwargs.get("model_name", "gpt-4o"), temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) elif provider == "deepseek": if not kwargs.get("base_url", ""): base_url = os.getenv("DEEPSEEK_ENDPOINT", "") else: base_url = kwargs.get("base_url") if kwargs.get("model_name", "deepseek-chat") == "deepseek-reasoner": return DeepSeekR1ChatOpenAI( model=kwargs.get("model_name", "deepseek-reasoner"), temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) else: return ChatOpenAI( model=kwargs.get("model_name", "deepseek-chat"), temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) elif provider == "gemini": return ChatGoogleGenerativeAI( model=kwargs.get("model_name", "gemini-2.0-flash-exp"), temperature=kwargs.get("temperature", 0.0), google_api_key=api_key, ) elif provider == "ollama": if not kwargs.get("base_url", ""): base_url = os.getenv("OLLAMA_ENDPOINT", "http://localhost:11434") else: base_url = kwargs.get("base_url") if "deepseek-r1" in kwargs.get("model_name", "qwen2.5:7b"): return DeepSeekR1ChatOllama( model=kwargs.get("model_name", "deepseek-r1:14b"), temperature=kwargs.get("temperature", 0.0), num_ctx=kwargs.get("num_ctx", 32000), base_url=base_url, ) else: return ChatOllama( model=kwargs.get("model_name", "qwen2.5:7b"), temperature=kwargs.get("temperature", 0.0), num_ctx=kwargs.get("num_ctx", 32000), num_predict=kwargs.get("num_predict", 1024), base_url=base_url, ) elif provider == "azure_openai": if not kwargs.get("base_url", ""): base_url = os.getenv("AZURE_OPENAI_ENDPOINT", "") else: base_url = kwargs.get("base_url") return AzureChatOpenAI( model=kwargs.get("model_name", "gpt-4o"), temperature=kwargs.get("temperature", 0.0), api_version="2024-05-01-preview", azure_endpoint=base_url, api_key=api_key, ) elif provider == "openrouter": if not kwargs.get("base_url", ""): base_url = os.getenv("OPENROUTER_ENDPOINT", "") else: base_url = kwargs.get("base_url") model_name = kwargs.get("model_name", "openai/o3-mini-high") if "r1" in model_name or "aion" in model_name: return DeepSeekR1ChatOpenAI( model=model_name, temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) else: return ChatOpenAI( model=model_name, temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) else: raise ValueError(f"Unsupported provider: {provider}") def encode_image(img_path): if not img_path: return None with open(img_path, "rb") as fin: image_data = base64.b64encode(fin.read()).decode("utf-8") return image_data def get_latest_files( directory: str, file_types: list = [".webm", ".zip"] ) -> Dict[str, Optional[str]]: """Get the latest recording and trace files""" latest_files: Dict[str, Optional[str]] = {ext: None for ext in file_types} if not os.path.exists(directory): os.makedirs(directory, exist_ok=True) return latest_files for file_type in file_types: try: matches = list(Path(directory).rglob(f"*{file_type}")) if matches: latest = max(matches, key=lambda p: p.stat().st_mtime) # Only return files that are complete (not being written) if time.time() - latest.stat().st_mtime > 1.0: latest_files[file_type] = str(latest) except Exception as e: print(f"Error getting latest {file_type} file: {e}") return latest_files async def capture_screenshot(browser_context): """Capture and encode a screenshot""" # Extract the Playwright browser instance playwright_browser = ( browser_context.browser.playwright_browser ) # Ensure this is correct. # Check if the browser instance is valid and if an existing context can be reused if playwright_browser and playwright_browser.contexts: playwright_context = playwright_browser.contexts[0] else: return None # Access pages in the context pages = None if playwright_context: pages = playwright_context.pages # Use an existing page or create a new one if none exist if pages: active_page = pages[0] for page in pages: if page.url != "about:blank": active_page = page else: return None # Take screenshot try: screenshot = await active_page.screenshot(type="jpeg", quality=75, scale="css") encoded = base64.b64encode(screenshot).decode("utf-8") return encoded except Exception as e: return None ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/agent/custom_prompts.py: -------------------------------------------------------------------------------- ```python import pdb from typing import List, Optional from browser_use.agent.prompts import SystemPrompt, AgentMessagePrompt from browser_use.agent.views import ActionResult, ActionModel from browser_use.browser.views import BrowserState from langchain_core.messages import HumanMessage, SystemMessage from datetime import datetime from .custom_views import CustomAgentStepInfo class CustomSystemPrompt(SystemPrompt): def important_rules(self) -> str: """ Returns the important rules for the agent. """ text = r""" 1. RESPONSE FORMAT: You must ALWAYS respond with valid JSON in this exact format: { "current_state": { "prev_action_evaluation": "Success|Failed|Unknown - Analyze the current elements and the image to check if the previous goals/actions are successful like intended by the task. Ignore the action result. The website is the ground truth. Also mention if something unexpected happened like new suggestions in an input field. Shortly state why/why not. Note that the result you output must be consistent with the reasoning you output afterwards. If you consider it to be 'Failed,' you should reflect on this during your thought.", "important_contents": "Output important contents closely related to user\'s instruction on the current page. If there is, please output the contents. If not, please output empty string ''.", "task_progress": "Task Progress is a general summary of the current contents that have been completed. Just summarize the contents that have been actually completed based on the content at current step and the history operations. Please list each completed item individually, such as: 1. Input username. 2. Input Password. 3. Click confirm button. Please return string type not a list.", "future_plans": "Based on the user's request and the current state, outline the remaining steps needed to complete the task. This should be a concise list of actions yet to be performed, such as: 1. Select a date. 2. Choose a specific time slot. 3. Confirm booking. Please return string type not a list.", "thought": "Think about the requirements that have been completed in previous operations and the requirements that need to be completed in the next one operation. If your output of prev_action_evaluation is 'Failed', please reflect and output your reflection here.", "summary": "Please generate a brief natural language description for the operation in next actions based on your Thought." }, "action": [ * actions in sequences, please refer to **Common action sequences**. Each output action MUST be formated as: \{action_name\: action_params\}* ] } 2. ACTIONS: You can specify multiple actions to be executed in sequence. Common action sequences: - Form filling: [ {"input_text": {"index": 1, "text": "username"}}, {"input_text": {"index": 2, "text": "password"}}, {"click_element": {"index": 3}} ] - Navigation and extraction: [ {"go_to_url": {"url": "https://example.com"}}, {"extract_page_content": {}} ] 3. ELEMENT INTERACTION: - Only use indexes that exist in the provided element list - Each element has a unique index number (e.g., "33[:]<button>") - Elements marked with "_[:]" are non-interactive (for context only) 4. NAVIGATION & ERROR HANDLING: - If no suitable elements exist, use other functions to complete the task - If stuck, try alternative approaches - Handle popups/cookies by accepting or closing them - Use scroll to find elements you are looking for 5. TASK COMPLETION: - If you think all the requirements of user\'s instruction have been completed and no further operation is required, output the **Done** action to terminate the operation process. - Don't hallucinate actions. - If the task requires specific information - make sure to include everything in the done function. This is what the user will see. - If you are running out of steps (current step), think about speeding it up, and ALWAYS use the done action as the last action. - Note that you must verify if you've truly fulfilled the user's request by examining the actual page content, not just by looking at the actions you output but also whether the action is executed successfully. Pay particular attention when errors occur during action execution. 6. VISUAL CONTEXT: - When an image is provided, use it to understand the page layout - Bounding boxes with labels correspond to element indexes - Each bounding box and its label have the same color - Most often the label is inside the bounding box, on the top right - Visual context helps verify element locations and relationships - sometimes labels overlap, so use the context to verify the correct element 7. Form filling: - If you fill an input field and your action sequence is interrupted, most often a list with suggestions poped up under the field and you need to first select the right element from the suggestion list. 8. ACTION SEQUENCING: - Actions are executed in the order they appear in the list - Each action should logically follow from the previous one - If the page changes after an action, the sequence is interrupted and you get the new state. - If content only disappears the sequence continues. - Only provide the action sequence until you think the page will change. - Try to be efficient, e.g. fill forms at once, or chain actions where nothing changes on the page like saving, extracting, checkboxes... - only use multiple actions if it makes sense. """ text += f" - use maximum {self.max_actions_per_step} actions per sequence" return text def input_format(self) -> str: return """ INPUT STRUCTURE: 1. Task: The user\'s instructions you need to complete. 2. Hints(Optional): Some hints to help you complete the user\'s instructions. 3. Memory: Important contents are recorded during historical operations for use in subsequent operations. 4. Current URL: The webpage you're currently on 5. Available Tabs: List of open browser tabs 6. Interactive Elements: List in the format: index[:]<element_type>element_text</element_type> - index: Numeric identifier for interaction - element_type: HTML element type (button, input, etc.) - element_text: Visible text or element description Example: 33[:]<button>Submit Form</button> _[:] Non-interactive text Notes: - Only elements with numeric indexes are interactive - _[:] elements provide context but cannot be interacted with """ def get_system_message(self) -> SystemMessage: """ Get the system prompt for the agent. Returns: str: Formatted system prompt """ AGENT_PROMPT = f"""You are a precise browser automation agent that interacts with websites through structured commands. Your role is to: 1. Analyze the provided webpage elements and structure 2. Plan a sequence of actions to accomplish the given task 3. Your final result MUST be a valid JSON as the **RESPONSE FORMAT** described, containing your action sequence and state assessment, No need extra content to expalin. {self.input_format()} {self.important_rules()} Functions: {self.default_action_description} Remember: Your responses must be valid JSON matching the specified format. Each action in the sequence must be valid.""" return SystemMessage(content=AGENT_PROMPT) class CustomAgentMessagePrompt(AgentMessagePrompt): def __init__( self, state: BrowserState, actions: Optional[List[ActionModel]] = None, result: Optional[List[ActionResult]] = None, include_attributes: list[str] = [], max_error_length: int = 400, step_info: Optional[CustomAgentStepInfo] = None, ): super(CustomAgentMessagePrompt, self).__init__(state=state, result=result, include_attributes=include_attributes, max_error_length=max_error_length, step_info=step_info ) self.actions = actions def get_user_message(self) -> HumanMessage: if self.step_info: step_info_description = f'Current step: {self.step_info.step_number}/{self.step_info.max_steps}\n' else: step_info_description = '' time_str = datetime.now().strftime("%Y-%m-%d %H:%M") step_info_description += "Current date and time: {time_str}" elements_text = self.state.element_tree.clickable_elements_to_string(include_attributes=self.include_attributes) has_content_above = (self.state.pixels_above or 0) > 0 has_content_below = (self.state.pixels_below or 0) > 0 if elements_text != '': if has_content_above: elements_text = ( f'... {self.state.pixels_above} pixels above - scroll or extract content to see more ...\n{elements_text}' ) else: elements_text = f'[Start of page]\n{elements_text}' if has_content_below: elements_text = ( f'{elements_text}\n... {self.state.pixels_below} pixels below - scroll or extract content to see more ...' ) else: elements_text = f'{elements_text}\n[End of page]' else: elements_text = 'empty page' state_description = f""" {step_info_description} 1. Task: {self.step_info.task}. 2. Hints(Optional): {self.step_info.add_infos} 3. Memory: {self.step_info.memory} 4. Current url: {self.state.url} 5. Available tabs: {self.state.tabs} 6. Interactive elements: {elements_text} """ if self.actions and self.result: state_description += "\n **Previous Actions** \n" state_description += f'Previous step: {self.step_info.step_number-1}/{self.step_info.max_steps} \n' for i, result in enumerate(self.result): action = self.actions[i] state_description += f"Previous action {i + 1}/{len(self.result)}: {action.model_dump_json(exclude_unset=True)}\n" if result.include_in_memory: if result.extracted_content: state_description += f"Result of previous action {i + 1}/{len(self.result)}: {result.extracted_content}\n" if result.error: # only use last 300 characters of error error = result.error[-self.max_error_length:] state_description += ( f"Error of previous action {i + 1}/{len(self.result)}: ...{error}\n" ) if self.state.screenshot: # Format message for vision model return HumanMessage( content=[ {"type": "text", "text": state_description}, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{self.state.screenshot}" }, }, ] ) return HumanMessage(content=state_description) ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/utils/deep_research.py: -------------------------------------------------------------------------------- ```python import pdb from dotenv import load_dotenv load_dotenv() import asyncio import os import sys import logging from pprint import pprint from uuid import uuid4 from mcp_server_browser_use.utils import utils from mcp_server_browser_use.agent.custom_agent import CustomAgent import json from browser_use.agent.service import Agent from browser_use.browser.browser import BrowserConfig, Browser from langchain.schema import SystemMessage, HumanMessage from json_repair import repair_json from mcp_server_browser_use.agent.custom_prompts import ( CustomSystemPrompt, CustomAgentMessagePrompt, ) from mcp_server_browser_use.controller.custom_controller import CustomController logger = logging.getLogger(__name__) async def deep_research(task, llm, **kwargs): task_id = str(uuid4()) save_dir = kwargs.get("save_dir", os.path.join(f"./tmp/deep_research/{task_id}")) logger.info(f"Save Deep Research at: {save_dir}") os.makedirs(save_dir, exist_ok=True) # max qyery num per iteration max_query_num = kwargs.get("max_query_num", 3) search_system_prompt = f""" You are a **Deep Researcher**, an AI agent specializing in in-depth information gathering and research using a web browser with **automated execution capabilities**. Your expertise lies in formulating comprehensive research plans and executing them meticulously to fulfill complex user requests. You will analyze user instructions, devise a detailed research plan, and determine the necessary search queries to gather the required information. **Your Task:** Given a user's research topic, you will: 1. **Develop a Research Plan:** Outline the key aspects and subtopics that need to be investigated to thoroughly address the user's request. This plan should be a high-level overview of the research direction. 2. **Generate Search Queries:** Based on your research plan, generate a list of specific search queries to be executed in a web browser. These queries should be designed to efficiently gather relevant information for each aspect of your plan. **Output Format:** Your output will be a JSON object with the following structure: ```json {{ "plan": "A concise, high-level research plan outlining the key areas to investigate.", "queries": [ "search query 1", "search query 2", //... up to a maximum of {max_query_num} search queries ] }} ``` **Important:** * Limit your output to a **maximum of {max_query_num}** search queries. * Make the search queries to help the automated agent find the needed information. Consider what keywords are most likely to lead to useful results. * If you have gathered for all the information you want and no further search queries are required, output queries with an empty list: `[]` * Make sure output search queries are different from the history queries. **Inputs:** 1. **User Instruction:** The original instruction given by the user. 2. **Previous Queries:** History Queries. 3. **Previous Search Results:** Textual data gathered from prior search queries. If there are no previous search results this string will be empty. """ search_messages = [SystemMessage(content=search_system_prompt)] record_system_prompt = """ You are an expert information recorder. Your role is to process user instructions, current search results, and previously recorded information to extract, summarize, and record new, useful information that helps fulfill the user's request. Your output will be a JSON formatted list, where each element represents a piece of extracted information and follows the structure: `{"url": "source_url", "title": "source_title", "summary_content": "concise_summary", "thinking": "reasoning"}`. **Important Considerations:** 1. **Minimize Information Loss:** While concise, prioritize retaining important details and nuances from the sources. Aim for a summary that captures the essence of the information without over-simplification. **Crucially, ensure to preserve key data and figures within the `summary_content`. This is essential for later stages, such as generating tables and reports.** 2. **Avoid Redundancy:** Do not record information that is already present in the Previous Recorded Information. Check for semantic similarity, not just exact matches. However, if the same information is expressed differently in a new source and this variation adds valuable context or clarity, it should be included. 3. **Source Information:** Extract and include the source title and URL for each piece of information summarized. This is crucial for verification and context. **The Current Search Results are provided in a specific format, where each item starts with "Title:", followed by the title, then "URL Source:", followed by the URL, and finally "Markdown Content:", followed by the content. Please extract the title and URL from this structure.** If a piece of information cannot be attributed to a specific source from the provided search results, use `"url": "unknown"` and `"title": "unknown"`. 4. **Thinking and Report Structure:** For each extracted piece of information, add a `"thinking"` key. This field should contain your assessment of how this information could be used in a report, which section it might belong to (e.g., introduction, background, analysis, conclusion, specific subtopics), and any other relevant thoughts about its significance or connection to other information. **Output Format:** Provide your output as a JSON formatted list. Each item in the list must adhere to the following format: ```json [ { "url": "source_url_1", "title": "source_title_1", "summary_content": "Concise summary of content. Remember to include key data and figures here.", "thinking": "This could be used in the introduction to set the context. It also relates to the section on the history of the topic." }, // ... more entries { "url": "unknown", "title": "unknown", "summary_content": "concise_summary_of_content_without_clear_source", "thinking": "This might be useful background information, but I need to verify its accuracy. Could be used in the methodology section to explain how data was collected." } ] ``` **Inputs:** 1. **User Instruction:** The original instruction given by the user. This helps you determine what kind of information will be useful and how to structure your thinking. 2. **Previous Recorded Information:** Textual data gathered and recorded from previous searches and processing, represented as a single text string. 3. **Current Search Results:** Textual data gathered from the most recent search query. """ record_messages = [SystemMessage(content=record_system_prompt)] browser = Browser( config=BrowserConfig( disable_security=True, headless=kwargs.get( "headless", False ), # Set to False to see browser actions ) ) controller = CustomController() search_iteration = 0 max_search_iterations = kwargs.get( "max_search_iterations", 10 ) # Limit search iterations to prevent infinite loop use_vision = kwargs.get("use_vision", False) history_query = [] history_infos = [] try: while search_iteration < max_search_iterations: search_iteration += 1 logger.info(f"Start {search_iteration}th Search...") history_query_ = json.dumps(history_query, indent=4) history_infos_ = json.dumps(history_infos, indent=4) query_prompt = f"This is search {search_iteration} of {max_search_iterations} maximum searches allowed.\n User Instruction:{task} \n Previous Queries:\n {history_query_} \n Previous Search Results:\n {history_infos_}\n" search_messages.append(HumanMessage(content=query_prompt)) ai_query_msg = llm.invoke(search_messages[:1] + search_messages[1:][-1:]) search_messages.append(ai_query_msg) if hasattr(ai_query_msg, "reasoning_content"): logger.info("🤯 Start Search Deep Thinking: ") logger.info(ai_query_msg.reasoning_content) logger.info("🤯 End Search Deep Thinking") ai_query_content = ai_query_msg.content.replace("```json", "").replace( "```", "" ) ai_query_content = repair_json(ai_query_content) ai_query_content = json.loads(ai_query_content) query_plan = ai_query_content["plan"] logger.info(f"Current Iteration {search_iteration} Planing:") logger.info(query_plan) query_tasks = ai_query_content["queries"] if not query_tasks: break else: history_query.extend(query_tasks) logger.info("Query tasks:") logger.info(query_tasks) # 2. Perform Web Search and Auto exec # Paralle BU agents add_infos = ( "1. Please click on the most relevant link to get information and go deeper, instead of just staying on the search page. \n" "2. When opening a PDF file, please remember to extract the content using extract_content instead of simply opening it for the user to view." ) agents = [ CustomAgent( task=task, llm=llm, add_infos=add_infos, browser=browser, use_vision=use_vision, system_prompt_class=CustomSystemPrompt, agent_prompt_class=CustomAgentMessagePrompt, max_actions_per_step=5, controller=controller, ) for task in query_tasks ] query_results = await asyncio.gather( *[agent.run(max_steps=kwargs.get("max_steps", 10)) for agent in agents] ) # 3. Summarize Search Result query_result_dir = os.path.join(save_dir, "query_results") os.makedirs(query_result_dir, exist_ok=True) for i in range(len(query_tasks)): query_result = query_results[i].final_result() querr_save_path = os.path.join( query_result_dir, f"{search_iteration}-{i}.md" ) logger.info(f"save query: {query_tasks[i]} at {querr_save_path}") with open(querr_save_path, "w", encoding="utf-8") as fw: fw.write(f"Query: {query_tasks[i]}\n") fw.write(query_result) history_infos_ = json.dumps(history_infos, indent=4) record_prompt = f"User Instruction:{task}. \nPrevious Recorded Information:\n {json.dumps(history_infos_)} \n Current Search Results: {query_result}\n " record_messages.append(HumanMessage(content=record_prompt)) ai_record_msg = llm.invoke(record_messages[:1] + record_messages[-1:]) record_messages.append(ai_record_msg) if hasattr(ai_record_msg, "reasoning_content"): logger.info("🤯 Start Record Deep Thinking: ") logger.info(ai_record_msg.reasoning_content) logger.info("🤯 End Record Deep Thinking") record_content = ai_record_msg.content record_content = repair_json(record_content) new_record_infos = json.loads(record_content) history_infos.extend(new_record_infos) logger.info("\nFinish Searching, Start Generating Report...") # 5. Report Generation in Markdown (or JSON if you prefer) writer_system_prompt = """ You are a **Deep Researcher** and a professional report writer tasked with creating polished, high-quality reports that fully meet the user's needs, based on the user's instructions and the relevant information provided. You will write the report using Markdown format, ensuring it is both informative and visually appealing. **Specific Instructions:** * **Structure for Impact:** The report must have a clear, logical, and impactful structure. Begin with a compelling introduction that immediately grabs the reader's attention. Develop well-structured body paragraphs that flow smoothly and logically, and conclude with a concise and memorable conclusion that summarizes key takeaways and leaves a lasting impression. * **Engaging and Vivid Language:** Employ precise, vivid, and descriptive language to make the report captivating and enjoyable to read. Use stylistic techniques to enhance engagement. Tailor your tone, vocabulary, and writing style to perfectly suit the subject matter and the intended audience to maximize impact and readability. * **Accuracy, Credibility, and Citations:** Ensure that all information presented is meticulously accurate, rigorously truthful, and robustly supported by the available data. **Cite sources exclusively using bracketed sequential numbers within the text (e.g., [1], [2], etc.). If no references are used, omit citations entirely.** These numbers must correspond to a numbered list of references at the end of the report. * **Publication-Ready Formatting:** Adhere strictly to Markdown formatting for excellent readability and a clean, highly professional visual appearance. Pay close attention to formatting details like headings, lists, emphasis, and spacing to optimize the visual presentation and reader experience. The report should be ready for immediate publication upon completion, requiring minimal to no further editing for style or format. * **Conciseness and Clarity (Unless Specified Otherwise):** When the user does not provide a specific length, prioritize concise and to-the-point writing, maximizing information density while maintaining clarity. * **Data-Driven Comparisons with Tables:** **When appropriate and beneficial for enhancing clarity and impact, present data comparisons in well-structured Markdown tables. This is especially encouraged when dealing with numerical data or when a visual comparison can significantly improve the reader's understanding.** * **Length Adherence:** When the user specifies a length constraint, meticulously stay within reasonable bounds of that specification, ensuring the content is appropriately scaled without sacrificing quality or completeness. * **Comprehensive Instruction Following:** Pay meticulous attention to all details and nuances provided in the user instructions. Strive to fulfill every aspect of the user's request with the highest degree of accuracy and attention to detail, creating a report that not only meets but exceeds expectations for quality and professionalism. * **Reference List Formatting:** The reference list at the end must be formatted as follows: `[1] Title (URL, if available)` **Each reference must be separated by a blank line to ensure proper spacing.** For example: ``` [1] Title 1 (URL1, if available) [2] Title 2 (URL2, if available) ``` **Furthermore, ensure that the reference list is free of duplicates. Each unique source should be listed only once, regardless of how many times it is cited in the text.** * **ABSOLUTE FINAL OUTPUT RESTRICTION:** **Your output must contain ONLY the finished, publication-ready Markdown report. Do not include ANY extraneous text, phrases, preambles, meta-commentary, or markdown code indicators (e.g., "```markdown```"). The report should begin directly with the title and introductory paragraph, and end directly after the conclusion and the reference list (if applicable).** **Your response will be deemed a failure if this instruction is not followed precisely.** **Inputs:** 1. **User Instruction:** The original instruction given by the user. This helps you determine what kind of information will be useful and how to structure your thinking. 2. **Search Information:** Information gathered from the search queries. """ history_infos_ = json.dumps(history_infos, indent=4) record_json_path = os.path.join(save_dir, "record_infos.json") logger.info(f"save All recorded information at {record_json_path}") with open(record_json_path, "w") as fw: json.dump(history_infos, fw, indent=4) report_prompt = ( f"User Instruction:{task} \n Search Information:\n {history_infos_}" ) report_messages = [ SystemMessage(content=writer_system_prompt), HumanMessage(content=report_prompt), ] # New context for report generation ai_report_msg = llm.invoke(report_messages) if hasattr(ai_report_msg, "reasoning_content"): logger.info("🤯 Start Report Deep Thinking: ") logger.info(ai_report_msg.reasoning_content) logger.info("🤯 End Report Deep Thinking") report_content = ai_report_msg.content report_file_path = os.path.join(save_dir, "final_report.md") with open(report_file_path, "w", encoding="utf-8") as f: f.write(report_content) logger.info(f"Save Report at: {report_file_path}") return report_content, report_file_path except Exception as e: logger.error(f"Deep research Error: {e}") return "", None finally: if browser: await browser.close() logger.info("Browser closed.") ``` -------------------------------------------------------------------------------- /src/mcp_server_browser_use/agent/custom_agent.py: -------------------------------------------------------------------------------- ```python import json import logging import pdb import traceback from typing import Optional, Type, List, Dict, Any, Callable from PIL import Image, ImageDraw, ImageFont import os import base64 import io import platform from browser_use.agent.prompts import SystemPrompt, AgentMessagePrompt from browser_use.agent.service import Agent from browser_use.agent.views import ( ActionResult, ActionModel, AgentHistoryList, AgentOutput, AgentHistory, ) from browser_use.browser.browser import Browser from browser_use.browser.context import BrowserContext from browser_use.browser.views import BrowserStateHistory from browser_use.controller.service import Controller from browser_use.telemetry.views import ( AgentEndTelemetryEvent, AgentRunTelemetryEvent, AgentStepTelemetryEvent, ) from browser_use.utils import time_execution_async from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages import ( BaseMessage, ) from json_repair import repair_json from mcp_server_browser_use.utils.agent_state import AgentState from .custom_massage_manager import CustomMassageManager from .custom_views import CustomAgentOutput, CustomAgentStepInfo logger = logging.getLogger(__name__) class CustomAgent(Agent): def __init__( self, task: str, llm: BaseChatModel, add_infos: str = "", browser: Browser | None = None, browser_context: BrowserContext | None = None, controller: Controller = Controller(), use_vision: bool = True, save_conversation_path: Optional[str] = None, max_failures: int = 5, retry_delay: int = 10, system_prompt_class: Type[SystemPrompt] = SystemPrompt, agent_prompt_class: Type[AgentMessagePrompt] = AgentMessagePrompt, max_input_tokens: int = 128000, validate_output: bool = False, include_attributes: list[str] = [ "title", "type", "name", "role", "tabindex", "aria-label", "placeholder", "value", "alt", "aria-expanded", ], max_error_length: int = 400, max_actions_per_step: int = 10, tool_call_in_content: bool = True, agent_state: AgentState = None, initial_actions: Optional[List[Dict[str, Dict[str, Any]]]] = None, # Cloud Callbacks register_new_step_callback: ( Callable[["BrowserState", "AgentOutput", int], None] | None ) = None, register_done_callback: Callable[["AgentHistoryList"], None] | None = None, tool_calling_method: Optional[str] = "auto", ): super().__init__( task=task, llm=llm, browser=browser, browser_context=browser_context, controller=controller, use_vision=use_vision, save_conversation_path=save_conversation_path, max_failures=max_failures, retry_delay=retry_delay, system_prompt_class=system_prompt_class, max_input_tokens=max_input_tokens, validate_output=validate_output, include_attributes=include_attributes, max_error_length=max_error_length, max_actions_per_step=max_actions_per_step, tool_call_in_content=tool_call_in_content, initial_actions=initial_actions, register_new_step_callback=register_new_step_callback, register_done_callback=register_done_callback, tool_calling_method=tool_calling_method, ) if self.model_name in ["deepseek-reasoner"] or "deepseek-r1" in self.model_name: # deepseek-reasoner does not support function calling self.use_deepseek_r1 = True # deepseek-reasoner only support 64000 context self.max_input_tokens = 64000 else: self.use_deepseek_r1 = False # record last actions self._last_actions = None # record extract content self.extracted_content = "" # custom new info self.add_infos = add_infos # agent_state for Stop self.agent_state = agent_state self.agent_prompt_class = agent_prompt_class self.message_manager = CustomMassageManager( llm=self.llm, task=self.task, action_descriptions=self.controller.registry.get_prompt_description(), system_prompt_class=self.system_prompt_class, agent_prompt_class=agent_prompt_class, max_input_tokens=self.max_input_tokens, include_attributes=self.include_attributes, max_error_length=self.max_error_length, max_actions_per_step=self.max_actions_per_step, ) def _setup_action_models(self) -> None: """Setup dynamic action models from controller's registry""" # Get the dynamic action model from controller's registry self.ActionModel = self.controller.registry.create_action_model() # Create output model with the dynamic actions self.AgentOutput = CustomAgentOutput.type_with_custom_actions(self.ActionModel) def _log_response(self, response: CustomAgentOutput) -> None: """Log the model's response""" if "Success" in response.current_state.prev_action_evaluation: emoji = "✅" elif "Failed" in response.current_state.prev_action_evaluation: emoji = "❌" else: emoji = "🤷" logger.info(f"{emoji} Eval: {response.current_state.prev_action_evaluation}") logger.info(f"🧠 New Memory: {response.current_state.important_contents}") logger.info(f"⏳ Task Progress: \n{response.current_state.task_progress}") logger.info(f"📋 Future Plans: \n{response.current_state.future_plans}") logger.info(f"🤔 Thought: {response.current_state.thought}") logger.info(f"🎯 Summary: {response.current_state.summary}") for i, action in enumerate(response.action): logger.info( f"🛠️ Action {i + 1}/{len(response.action)}: {action.model_dump_json(exclude_unset=True)}" ) def update_step_info( self, model_output: CustomAgentOutput, step_info: CustomAgentStepInfo = None ): """ update step info """ if step_info is None: return step_info.step_number += 1 important_contents = model_output.current_state.important_contents if ( important_contents and "None" not in important_contents and important_contents not in step_info.memory ): step_info.memory += important_contents + "\n" task_progress = model_output.current_state.task_progress if task_progress and "None" not in task_progress: step_info.task_progress = task_progress future_plans = model_output.current_state.future_plans if future_plans and "None" not in future_plans: step_info.future_plans = future_plans @time_execution_async("--get_next_action") async def get_next_action(self, input_messages: list[BaseMessage]) -> AgentOutput: """Get next action from LLM based on current state""" messages_to_process = ( self.message_manager.merge_successive_human_messages(input_messages) if self.use_deepseek_r1 else input_messages ) ai_message = self.llm.invoke(messages_to_process) self.message_manager._add_message_with_tokens(ai_message) if self.use_deepseek_r1: logger.info("🤯 Start Deep Thinking: ") logger.info(ai_message.reasoning_content) logger.info("🤯 End Deep Thinking") if isinstance(ai_message.content, list): ai_content = ai_message.content[0] else: ai_content = ai_message.content ai_content = ai_content.replace("```json", "").replace("```", "") ai_content = repair_json(ai_content) parsed_json = json.loads(ai_content) parsed: AgentOutput = self.AgentOutput(**parsed_json) if parsed is None: logger.debug(ai_message.content) raise ValueError("Could not parse response.") # Limit actions to maximum allowed per step parsed.action = parsed.action[: self.max_actions_per_step] self._log_response(parsed) self.n_steps += 1 return parsed @time_execution_async("--step") async def step(self, step_info: Optional[CustomAgentStepInfo] = None) -> None: """Execute one step of the task""" logger.info(f"\n📍 Step {self.n_steps}") state = None model_output = None result: list[ActionResult] = [] try: state = await self.browser_context.get_state(use_vision=self.use_vision) self.message_manager.add_state_message( state, self._last_actions, self._last_result, step_info ) input_messages = self.message_manager.get_messages() try: model_output = await self.get_next_action(input_messages) if self.register_new_step_callback: self.register_new_step_callback(state, model_output, self.n_steps) self.update_step_info(model_output, step_info) logger.info(f"🧠 All Memory: \n{step_info.memory}") self._save_conversation(input_messages, model_output) if self.model_name != "deepseek-reasoner": # remove prev message self.message_manager._remove_state_message_by_index(-1) except Exception as e: # model call failed, remove last state message from history self.message_manager._remove_state_message_by_index(-1) raise e actions: list[ActionModel] = model_output.action result: list[ActionResult] = await self.controller.multi_act( actions, self.browser_context ) if len(result) != len(actions): # I think something changes, such information should let LLM know for ri in range(len(result), len(actions)): result.append( ActionResult( extracted_content=None, include_in_memory=True, error=f"{actions[ri].model_dump_json(exclude_unset=True)} is Failed to execute. \ Something new appeared after action {actions[len(result) - 1].model_dump_json(exclude_unset=True)}", is_done=False, ) ) if len(actions) == 0: # TODO: fix no action case result = [ ActionResult( is_done=True, extracted_content=step_info.memory, include_in_memory=True, ) ] for ret_ in result: if "Extracted page" in ret_.extracted_content: # record every extracted page self.extracted_content += ret_.extracted_content self._last_result = result self._last_actions = actions if len(result) > 0 and result[-1].is_done: if not self.extracted_content: self.extracted_content = step_info.memory result[-1].extracted_content = self.extracted_content logger.info(f"📄 Result: {result[-1].extracted_content}") self.consecutive_failures = 0 except Exception as e: result = await self._handle_step_error(e) self._last_result = result finally: actions = ( [a.model_dump(exclude_unset=True) for a in model_output.action] if model_output else [] ) self.telemetry.capture( AgentStepTelemetryEvent( agent_id=self.agent_id, step=self.n_steps, actions=actions, consecutive_failures=self.consecutive_failures, step_error=( [r.error for r in result if r.error] if result else ["No result"] ), ) ) if not result: return if state: self._make_history_item(model_output, state, result) async def run(self, max_steps: int = 100) -> AgentHistoryList: """Execute the task with maximum number of steps""" try: self._log_agent_run() # Execute initial actions if provided if self.initial_actions: result = await self.controller.multi_act( self.initial_actions, self.browser_context, check_for_new_elements=False, ) self._last_result = result step_info = CustomAgentStepInfo( task=self.task, add_infos=self.add_infos, step_number=1, max_steps=max_steps, memory="", task_progress="", future_plans="", ) for step in range(max_steps): # 1) Check if stop requested if self.agent_state and self.agent_state.is_stop_requested(): logger.info("🛑 Stop requested by user") self._create_stop_history_item() break # 2) Store last valid state before step if self.browser_context and self.agent_state: state = await self.browser_context.get_state( use_vision=self.use_vision ) self.agent_state.set_last_valid_state(state) if self._too_many_failures(): break # 3) Do the step await self.step(step_info) if self.history.is_done(): if ( self.validate_output and step < max_steps - 1 ): # if last step, we dont need to validate if not await self._validate_output(): continue logger.info("✅ Task completed successfully") break else: logger.info("❌ Failed to complete task in maximum steps") if not self.extracted_content: self.history.history[-1].result[ -1 ].extracted_content = step_info.memory else: self.history.history[-1].result[ -1 ].extracted_content = self.extracted_content return self.history finally: self.telemetry.capture( AgentEndTelemetryEvent( agent_id=self.agent_id, success=self.history.is_done(), steps=self.n_steps, max_steps_reached=self.n_steps >= max_steps, errors=self.history.errors(), ) ) if not self.injected_browser_context: await self.browser_context.close() if not self.injected_browser and self.browser: await self.browser.close() if self.generate_gif: output_path: str = "agent_history.gif" if isinstance(self.generate_gif, str): output_path = self.generate_gif self.create_history_gif(output_path=output_path) def _create_stop_history_item(self): """Create a history item for when the agent is stopped.""" try: # Attempt to retrieve the last valid state from agent_state state = None if self.agent_state: last_state = self.agent_state.get_last_valid_state() if last_state: # Convert to BrowserStateHistory state = BrowserStateHistory( url=getattr(last_state, "url", ""), title=getattr(last_state, "title", ""), tabs=getattr(last_state, "tabs", []), interacted_element=[None], screenshot=getattr(last_state, "screenshot", None), ) else: state = self._create_empty_state() else: state = self._create_empty_state() # Create a final item in the agent history indicating done stop_history = AgentHistory( model_output=None, state=state, result=[ActionResult(extracted_content=None, error=None, is_done=True)], ) self.history.history.append(stop_history) except Exception as e: logger.error(f"Error creating stop history item: {e}") # Create empty state as fallback state = self._create_empty_state() stop_history = AgentHistory( model_output=None, state=state, result=[ActionResult(extracted_content=None, error=None, is_done=True)], ) self.history.history.append(stop_history) def _convert_to_browser_state_history(self, browser_state): return BrowserStateHistory( url=getattr(browser_state, "url", ""), title=getattr(browser_state, "title", ""), tabs=getattr(browser_state, "tabs", []), interacted_element=[None], screenshot=getattr(browser_state, "screenshot", None), ) def _create_empty_state(self): return BrowserStateHistory( url="", title="", tabs=[], interacted_element=[None], screenshot=None ) def create_history_gif( self, output_path: str = "agent_history.gif", duration: int = 3000, show_goals: bool = True, show_task: bool = True, show_logo: bool = False, font_size: int = 40, title_font_size: int = 56, goal_font_size: int = 44, margin: int = 40, line_spacing: float = 1.5, ) -> None: """Create a GIF from the agent's history with overlaid task and goal text.""" if not self.history.history: logger.warning("No history to create GIF from") return images = [] # if history is empty or first screenshot is None, we can't create a gif if not self.history.history or not self.history.history[0].state.screenshot: logger.warning("No history or first screenshot to create GIF from") return # Try to load nicer fonts try: # Try different font options in order of preference font_options = ["Helvetica", "Arial", "DejaVuSans", "Verdana"] font_loaded = False for font_name in font_options: try: if platform.system() == "Windows": # Need to specify the abs font path on Windows font_name = os.path.join( os.getenv("WIN_FONT_DIR", "C:\\Windows\\Fonts"), font_name + ".ttf", ) regular_font = ImageFont.truetype(font_name, font_size) title_font = ImageFont.truetype(font_name, title_font_size) goal_font = ImageFont.truetype(font_name, goal_font_size) font_loaded = True break except OSError: continue if not font_loaded: raise OSError("No preferred fonts found") except OSError: regular_font = ImageFont.load_default() title_font = ImageFont.load_default() goal_font = regular_font # Load logo if requested logo = None if show_logo: try: logo = Image.open("./static/browser-use.png") # Resize logo to be small (e.g., 40px height) logo_height = 150 aspect_ratio = logo.width / logo.height logo_width = int(logo_height * aspect_ratio) logo = logo.resize((logo_width, logo_height), Image.Resampling.LANCZOS) except Exception as e: logger.warning(f"Could not load logo: {e}") # Create task frame if requested if show_task and self.task: task_frame = self._create_task_frame( self.task, self.history.history[0].state.screenshot, title_font, regular_font, logo, line_spacing, ) images.append(task_frame) # Process each history item for i, item in enumerate(self.history.history, 1): if not item.state.screenshot: continue # Convert base64 screenshot to PIL Image img_data = base64.b64decode(item.state.screenshot) image = Image.open(io.BytesIO(img_data)) if show_goals and item.model_output: image = self._add_overlay_to_image( image=image, step_number=i, goal_text=item.model_output.current_state.thought, regular_font=regular_font, title_font=title_font, margin=margin, logo=logo, ) images.append(image) if images: # Save the GIF images[0].save( output_path, save_all=True, append_images=images[1:], duration=duration, loop=0, optimize=False, ) logger.info(f"Created GIF at {output_path}") else: logger.warning("No images found in history to create GIF") ```