# Directory Structure
```
├── .env.example
├── .gitignore
├── .pre-commit-config.yaml
├── .python-version
├── assets
│ └── header.png
├── CLAUDE.md
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│ └── mcp_server_browser_use
│ ├── __init__.py
│ ├── __main__.py
│ ├── _internal
│ │ ├── __init__.py
│ │ ├── agent
│ │ │ ├── __init__.py
│ │ │ ├── browser_use
│ │ │ │ └── browser_use_agent.py
│ │ │ └── deep_research
│ │ │ └── deep_research_agent.py
│ │ ├── browser
│ │ │ ├── __init__.py
│ │ │ ├── custom_browser.py
│ │ │ └── custom_context.py
│ │ ├── controller
│ │ │ ├── __init__.py
│ │ │ └── custom_controller.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── llm_provider.py
│ │ ├── mcp_client.py
│ │ └── utils.py
│ ├── cli.py
│ ├── config.py
│ └── server.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.11
```
--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------
```yaml
fail_fast: true
repos:
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v3.1.0
hooks:
- id: prettier
types_or: [yaml, json5]
# - repo: https://github.com/astral-sh/ruff-pre-commit
# rev: v0.8.1
# hooks:
# - id: ruff-format
# - id: ruff
# args: [--fix, --exit-non-zero-on-fix]
- repo: local
hooks:
- id: uv-lock-check
name: Check uv.lock is up to date
entry: uv lock --check
language: system
files: ^(pyproject\.toml|uv\.lock)$
pass_filenames: false
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# File created using '.gitignore Generator' for Visual Studio Code: https://bit.ly/vscode-gig
# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,python
# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,macos,python
### macOS ###
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### macOS Patch ###
# iCloud generated files
*.icloud
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml
# ruff
.ruff_cache/
# LSP config files
pyrightconfig.json
### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets
# Local History for Visual Studio Code
.history/
# Built Visual Studio Code Extensions
*.vsix
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide
# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,python
# Custom rules (everything added below won't be overriden by 'Generate .gitignore File' if you use 'Update' option)
agent_history.gif
trace.json
recording.mp4
temp/
tmp/
.vscode/
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
# This is an example .env file. Copy it to .env and fill in your values.
# Lines starting with # are comments.
# === Main LLM Configuration (MCP_LLM_*) ===
# Select the primary LLM provider
# Options: openai, azure_openai, anthropic, google, mistral, ollama, deepseek, openrouter, alibaba, moonshot, unbound
MCP_LLM_PROVIDER=google
# Specify the model name for the selected provider
MCP_LLM_MODEL_NAME=gemini-2.5-flash-preview-04-17
# LLM temperature (0.0-2.0). Controls randomness.
MCP_LLM_TEMPERATURE=0.0
# Optional: Generic override for the LLM provider's base URL
# MCP_LLM_BASE_URL=
# Optional: Generic override for the LLM provider's API key. Takes precedence over provider-specific keys.
# MCP_LLM_API_KEY=
# --- Provider Specific API Keys (MCP_LLM_*) ---
# Required unless using Ollama locally without auth or generic MCP_LLM_API_KEY is set
# MCP_LLM_OPENAI_API_KEY=YOUR_OPENAI_API_KEY
# MCP_LLM_ANTHROPIC_API_KEY=YOUR_ANTHROPIC_API_KEY
# MCP_LLM_GOOGLE_API_KEY=YOUR_GOOGLE_API_KEY
# MCP_LLM_AZURE_OPENAI_API_KEY=YOUR_AZURE_OPENAI_API_KEY
# MCP_LLM_DEEPSEEK_API_KEY=YOUR_DEEPSEEK_API_KEY
# MCP_LLM_MISTRAL_API_KEY=YOUR_MISTRAL_API_KEY
# MCP_LLM_OPENROUTER_API_KEY=YOUR_OPENROUTER_API_KEY
# MCP_LLM_ALIBABA_API_KEY=YOUR_ALIBABA_API_KEY
# MCP_LLM_MOONSHOT_API_KEY=YOUR_MOONSHOT_API_KEY
# MCP_LLM_UNBOUND_API_KEY=YOUR_UNBOUND_API_KEY
# --- Provider Specific Endpoints (MCP_LLM_*) ---
# Optional: Override default API endpoints.
# MCP_LLM_OPENAI_ENDPOINT=https://api.openai.com/v1
# MCP_LLM_ANTHROPIC_ENDPOINT=https://api.anthropic.com
# MCP_LLM_AZURE_OPENAI_ENDPOINT=YOUR_AZURE_ENDPOINT # Required if using Azure, e.g., https://your-resource.openai.azure.com/
# MCP_LLM_AZURE_OPENAI_API_VERSION=2025-01-01-preview
# MCP_LLM_DEEPSEEK_ENDPOINT=https://api.deepseek.com
# MCP_LLM_MISTRAL_ENDPOINT=https://api.mistral.ai/v1
# MCP_LLM_OLLAMA_ENDPOINT=http://localhost:11434
# MCP_LLM_OPENROUTER_ENDPOINT=https://openrouter.ai/api/v1
# MCP_LLM_ALIBABA_ENDPOINT=https://dashscope.aliyuncs.com/compatible-mode/v1
# MCP_LLM_MOONSHOT_ENDPOINT=https://api.moonshot.cn/v1
# MCP_LLM_UNBOUND_ENDPOINT=https://api.getunbound.ai
# --- Ollama Specific (MCP_LLM_*) ---
# MCP_LLM_OLLAMA_NUM_CTX=32000
# MCP_LLM_OLLAMA_NUM_PREDICT=1024
# === Planner LLM Configuration (Optional, MCP_LLM_PLANNER_*) ===
# If you want to use a different LLM for planning tasks within agents.
# Defaults to main LLM settings if not specified.
# MCP_LLM_PLANNER_PROVIDER=
# MCP_LLM_PLANNER_MODEL_NAME=
# MCP_LLM_PLANNER_TEMPERATURE=
# MCP_LLM_PLANNER_BASE_URL=
# MCP_LLM_PLANNER_API_KEY= # Generic planner API key, or use provider-specific below
# MCP_LLM_PLANNER_OPENAI_API_KEY=
# ... (similar provider-specific keys and endpoints for planner if needed)
# === Browser Configuration (MCP_BROWSER_*) ===
# General browser headless mode (true/false)
MCP_BROWSER_HEADLESS=false
# General browser disable security features (use cautiously) (true/false)
MCP_BROWSER_DISABLE_SECURITY=false
# Optional: Path to Chrome/Chromium executable
# MCP_BROWSER_BINARY_PATH=/usr/bin/chromium-browser
# Optional: Path to Chrome user data directory (for persistent sessions)
# MCP_BROWSER_USER_DATA_DIR=~/.config/google-chrome/Profile 1
MCP_BROWSER_WINDOW_WIDTH=1280
MCP_BROWSER_WINDOW_HEIGHT=1080
# Set to true to connect to user's browser via MCP_BROWSER_CDP_URL
MCP_BROWSER_USE_OWN_BROWSER=false
# Optional: Connect to existing Chrome via DevTools Protocol URL. Required if MCP_BROWSER_USE_OWN_BROWSER=true.
# MCP_BROWSER_CDP_URL=http://localhost:9222
# MCP_BROWSER_WSS_URL= # Optional: WSS URL if CDP URL is not sufficient
# Keep browser managed by server open between MCP tool calls (if MCP_BROWSER_USE_OWN_BROWSER=false)
MCP_BROWSER_KEEP_OPEN=false
# Optional: Directory to save Playwright trace files (useful for debugging). If not set, tracing to file is disabled.
# MCP_BROWSER_TRACE_PATH=./tmp/trace
# === Agent Tool Configuration (`run_browser_agent` tool, MCP_AGENT_TOOL_*) ===
MCP_AGENT_TOOL_MAX_STEPS=100
MCP_AGENT_TOOL_MAX_ACTIONS_PER_STEP=5
# Method for tool invocation ('auto', 'json_schema', 'function_calling')
MCP_AGENT_TOOL_TOOL_CALLING_METHOD=auto
MCP_AGENT_TOOL_MAX_INPUT_TOKENS=128000
# Enable vision capabilities (screenshot analysis)
MCP_AGENT_TOOL_USE_VISION=true
# Override general browser headless mode for this tool (true/false/empty for general setting)
# MCP_AGENT_TOOL_HEADLESS=
# Override general browser disable security for this tool (true/false/empty for general setting)
# MCP_AGENT_TOOL_DISABLE_SECURITY=
# Enable Playwright video recording (true/false)
MCP_AGENT_TOOL_ENABLE_RECORDING=false
# Optional: Path to save agent run video recordings. If not set, recording to file is disabled even if ENABLE_RECORDING=true.
# MCP_AGENT_TOOL_SAVE_RECORDING_PATH=./tmp/recordings
# Optional: Directory to save agent history JSON files. If not set, history saving is disabled.
# MCP_AGENT_TOOL_HISTORY_PATH=./tmp/agent_history
# === Deep Research Tool Configuration (`run_deep_research` tool, MCP_RESEARCH_TOOL_*) ===
MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS=3
# MANDATORY: Base directory to save research artifacts (report, results). Task ID will be appended.
# Example: MCP_RESEARCH_TOOL_SAVE_DIR=/mnt/data/research_outputs
# Example: MCP_RESEARCH_TOOL_SAVE_DIR=C:\\Users\\YourUser\\Documents\\ResearchData
MCP_RESEARCH_TOOL_SAVE_DIR=./tmp/deep_research
# === Path Configuration (MCP_PATHS_*) ===
# Optional: Directory for downloaded files. If not set, persistent downloads to a specific path are disabled.
# MCP_PATHS_DOWNLOADS=./tmp/downloads
# === Server Configuration (MCP_SERVER_*) ===
# Path for the server log file. Leave empty for stdout.
# MCP_SERVER_LOG_FILE=mcp_server_browser_use.log
# Logging level: DEBUG, INFO, WARNING, ERROR, CRITICAL
MCP_SERVER_LOGGING_LEVEL=INFO
# Enable/disable anonymized telemetry (true/false)
MCP_SERVER_ANONYMIZED_TELEMETRY=true
# Optional: JSON string for MCP client configuration for the controller
# MCP_SERVER_MCP_CONFIG='{"client_name": "mcp-browser-use-controller"}'
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
<img src="./assets/header.png" alt="Browser Use Web UI" width="full"/>
<br/>
# browser-use MCP server & CLI
[](https://docs.browser-use.com)
[](LICENSE)
> **Project Note**: This MCP server implementation builds upon the [browser-use/web-ui](https://github.com/browser-use/web-ui) foundation. Core browser automation logic and configuration patterns are adapted from the original project.
AI-driven browser automation server implementing the Model Context Protocol (MCP) for natural language browser control and web research. Also provides CLI access to its core functionalities.
<a href="https://glama.ai/mcp/servers/@Saik0s/mcp-browser-use"><img width="380" height="200" src="https://glama.ai/mcp/servers/@Saik0s/mcp-browser-use/badge" alt="Browser-Use MCP server" /></a>
## Features
- 🧠 **MCP Integration** - Full protocol implementation for AI agent communication.
- 🌐 **Browser Automation** - Page navigation, form filling, element interaction via natural language (`run_browser_agent` tool).
- 👁️ **Visual Understanding** - Optional screenshot analysis for vision-capable LLMs.
- 🔄 **State Persistence** - Option to manage a server browser session across multiple MCP calls or connect to user's browser.
- 🔌 **Multi-LLM Support** - Integrates with OpenAI, Anthropic, Azure, DeepSeek, Google, Mistral, Ollama, OpenRouter, Alibaba, Moonshot, Unbound AI.
- 🔍 **Deep Research Tool** - Dedicated tool for multi-step web research and report generation (`run_deep_research` tool).
- ⚙️ **Environment Variable Configuration** - Fully configurable via environment variables using a structured Pydantic model.
- 🔗 **CDP Connection** - Ability to connect to and control a user-launched Chrome/Chromium instance via Chrome DevTools Protocol.
- ⌨️ **CLI Interface** - Access core agent functionalities (`run_browser_agent`, `run_deep_research`) directly from the command line for testing and scripting.
## Quick Start
### The Essentials
1. Install UV - the rocket-powered Python installer:
`curl -LsSf https://astral.sh/uv/install.sh | sh`
2. Get Playwright browsers (required for automation):
`uvx --from mcp-server-browser-use@latest python -m playwright install`
### Integration Patterns
For MCP clients like Claude Desktop, add a server configuration that's as simple as:
```json
// Example 1: One-Line Latest Version (Always Fresh)
"mcpServers": {
"browser-use": {
"command": "uvx",
"args": ["mcp-server-browser-use@latest"],
"env": {
"MCP_LLM_GOOGLE_API_KEY": "YOUR_KEY_HERE_IF_USING_GOOGLE",
"MCP_LLM_PROVIDER": "google",
"MCP_LLM_MODEL_NAME": "gemini-2.5-flash-preview-04-17",
"MCP_BROWSER_HEADLESS": "true",
}
}
}
```
```json
// Example 2: Advanced Configuration with CDP
"mcpServers": {
"browser-use": {
"command": "uvx",
"args": ["mcp-server-browser-use@latest"],
"env": {
"MCP_LLM_OPENROUTER_API_KEY": "YOUR_KEY_HERE_IF_USING_OPENROUTER",
"MCP_LLM_PROVIDER": "openrouter",
"MCP_LLM_MODEL_NAME": "anthropic/claude-3.5-haiku",
"MCP_LLM_TEMPERATURE": "0.4",
"MCP_BROWSER_HEADLESS": "false",
"MCP_BROWSER_WINDOW_WIDTH": "1440",
"MCP_BROWSER_WINDOW_HEIGHT": "1080",
"MCP_AGENT_TOOL_USE_VISION": "true",
"MCP_RESEARCH_TOOL_SAVE_DIR": "/path/to/your/research",
"MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS": "5",
"MCP_PATHS_DOWNLOADS": "/path/to/your/downloads",
"MCP_BROWSER_USE_OWN_BROWSER": "true",
"MCP_BROWSER_CDP_URL": "http://localhost:9222",
"MCP_AGENT_TOOL_HISTORY_PATH": "/path/to/your/history",
"MCP_SERVER_LOGGING_LEVEL": "DEBUG",
"MCP_SERVER_LOG_FILE": "/path/to/your/log/mcp_server_browser_use.log",
}
}
}
```
```json
// Example 3: Advanced Configuration with User Data and custom chrome path
"mcpServers": {
"browser-use": {
"command": "uvx",
"args": ["mcp-server-browser-use@latest"],
"env": {
"MCP_LLM_OPENAI_API_KEY": "YOUR_KEY_HERE_IF_USING_OPENAI",
"MCP_LLM_PROVIDER": "openai",
"MCP_LLM_MODEL_NAME": "gpt-4.1-mini",
"MCP_LLM_TEMPERATURE": "0.2",
"MCP_BROWSER_HEADLESS": "false",
"MCP_BROWSER_BINARY_PATH": "/path/to/your/chrome/binary",
"MCP_BROWSER_USER_DATA_DIR": "/path/to/your/user/data",
"MCP_BROWSER_DISABLE_SECURITY": "true",
"MCP_BROWSER_KEEP_OPEN": "true",
"MCP_BROWSER_TRACE_PATH": "/path/to/your/trace",
"MCP_AGENT_TOOL_HISTORY_PATH": "/path/to/your/history",
"MCP_SERVER_LOGGING_LEVEL": "DEBUG",
"MCP_SERVER_LOG_FILE": "/path/to/your/log/mcp_server_browser_use.log",
}
}
}
```
```json
// Example 4: Local Development Flow
"mcpServers": {
"browser-use": {
"command": "uv",
"args": [
"--directory",
"/your/dev/path",
"run",
"mcp-server-browser-use"
],
"env": {
"MCP_LLM_OPENROUTER_API_KEY": "YOUR_KEY_HERE_IF_USING_OPENROUTER",
"MCP_LLM_PROVIDER": "openrouter",
"MCP_LLM_MODEL_NAME": "openai/gpt-4o-mini",
"MCP_BROWSER_HEADLESS": "true",
}
}
}
```
**Key Insight:** The best configurations emerge from starting simple (Example 1). The .env.example file contains all possible dials.
## MCP Tools
This server exposes the following tools via the Model Context Protocol:
### Synchronous Tools (Wait for Completion)
1. **`run_browser_agent`**
* **Description:** Executes a browser automation task based on natural language instructions and waits for it to complete. Uses settings from `MCP_AGENT_TOOL_*`, `MCP_LLM_*`, and `MCP_BROWSER_*` environment variables.
* **Arguments:**
* `task` (string, required): The primary task or objective.
* **Returns:** (string) The final result extracted by the agent or an error message. Agent history (JSON, optional GIF) saved if `MCP_AGENT_TOOL_HISTORY_PATH` is set.
2. **`run_deep_research`**
* **Description:** Performs in-depth web research on a topic, generates a report, and waits for completion. Uses settings from `MCP_RESEARCH_TOOL_*`, `MCP_LLM_*`, and `MCP_BROWSER_*` environment variables. If `MCP_RESEARCH_TOOL_SAVE_DIR` is set, outputs are saved to a subdirectory within it; otherwise, operates in memory-only mode.
* **Arguments:**
* `research_task` (string, required): The topic or question for the research.
* `max_parallel_browsers` (integer, optional): Overrides `MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS` from environment.
* **Returns:** (string) The generated research report in Markdown format, including the file path (if saved), or an error message.
## CLI Usage
This package also provides a command-line interface `mcp-browser-cli` for direct testing and scripting.
**Global Options:**
* `--env-file PATH, -e PATH`: Path to a `.env` file to load configurations from.
* `--log-level LEVEL, -l LEVEL`: Override the logging level (e.g., `DEBUG`, `INFO`).
**Commands:**
1. **`mcp-browser-cli run-browser-agent [OPTIONS] TASK`**
* **Description:** Runs a browser agent task.
* **Arguments:**
* `TASK` (string, required): The primary task for the agent.
* **Example:**
```bash
mcp-browser-cli run-browser-agent "Go to example.com and find the title." -e .env
```
2. **`mcp-browser-cli run-deep-research [OPTIONS] RESEARCH_TASK`**
* **Description:** Performs deep web research.
* **Arguments:**
* `RESEARCH_TASK` (string, required): The topic or question for research.
* **Options:**
* `--max-parallel-browsers INTEGER, -p INTEGER`: Override `MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS`.
* **Example:**
```bash
mcp-browser-cli run-deep-research "What are the latest advancements in AI-driven browser automation?" --max-parallel-browsers 5 -e .env
```
All other configurations (LLM keys, paths, browser settings) are picked up from environment variables (or the specified `.env` file) as detailed in the Configuration section.
## Configuration (Environment Variables)
Configure the server and CLI using environment variables. You can set these in your system or place them in a `.env` file in the project root (use `--env-file` for CLI). Variables are structured with prefixes.
| Variable Group (Prefix) | Example Variable | Description | Default Value |
| :---------------------------------- | :--------------------------------------------- | :--------------------------------------------------------------------------------------------------------- | :-------------------------------- |
| **Main LLM (MCP_LLM_)** | | Settings for the primary LLM used by agents. | |
| | `MCP_LLM_PROVIDER` | LLM provider. Options: `openai`, `azure_openai`, `anthropic`, `google`, `mistral`, `ollama`, etc. | `openai` |
| | `MCP_LLM_MODEL_NAME` | Specific model name for the provider. | `gpt-4.1` |
| | `MCP_LLM_TEMPERATURE` | LLM temperature (0.0-2.0). | `0.0` |
| | `MCP_LLM_BASE_URL` | Optional: Generic override for LLM provider's base URL. | Provider-specific |
| | `MCP_LLM_API_KEY` | Optional: Generic LLM API key (takes precedence). | - |
| | `MCP_LLM_OPENAI_API_KEY` | API Key for OpenAI (if provider is `openai`). | - |
| | `MCP_LLM_ANTHROPIC_API_KEY` | API Key for Anthropic. | - |
| | `MCP_LLM_GOOGLE_API_KEY` | API Key for Google AI (Gemini). | - |
| | `MCP_LLM_AZURE_OPENAI_API_KEY` | API Key for Azure OpenAI. | - |
| | `MCP_LLM_AZURE_OPENAI_ENDPOINT` | **Required if using Azure.** Your Azure resource endpoint. | - |
| | `MCP_LLM_OLLAMA_ENDPOINT` | Ollama API endpoint URL. | `http://localhost:11434` |
| | `MCP_LLM_OLLAMA_NUM_CTX` | Context window size for Ollama models. | `32000` |
| **Planner LLM (MCP_LLM_PLANNER_)** | | Optional: Settings for a separate LLM for agent planning. Defaults to Main LLM if not set. | |
| | `MCP_LLM_PLANNER_PROVIDER` | Planner LLM provider. | Main LLM Provider |
| | `MCP_LLM_PLANNER_MODEL_NAME` | Planner LLM model name. | Main LLM Model |
| **Browser (MCP_BROWSER_)** | | General browser settings. | |
| | `MCP_BROWSER_HEADLESS` | Run browser without UI (general setting). | `false` |
| | `MCP_BROWSER_DISABLE_SECURITY` | Disable browser security features (general setting, use cautiously). | `false` |
| | `MCP_BROWSER_BINARY_PATH` | Path to Chrome/Chromium executable. | - |
| | `MCP_BROWSER_USER_DATA_DIR` | Path to Chrome user data directory. | - |
| | `MCP_BROWSER_WINDOW_WIDTH` | Browser window width (pixels). | `1280` |
| | `MCP_BROWSER_WINDOW_HEIGHT` | Browser window height (pixels). | `1080` |
| | `MCP_BROWSER_USE_OWN_BROWSER` | Connect to user's browser via CDP URL. | `false` |
| | `MCP_BROWSER_CDP_URL` | CDP URL (e.g., `http://localhost:9222`). Required if `MCP_BROWSER_USE_OWN_BROWSER=true`. | - |
| | `MCP_BROWSER_KEEP_OPEN` | Keep server-managed browser open between MCP calls (if `MCP_BROWSER_USE_OWN_BROWSER=false`). | `false` |
| | `MCP_BROWSER_TRACE_PATH` | Optional: Directory to save Playwright trace files. If not set, tracing to file is disabled. | ` ` (empty, tracing disabled) |
| **Agent Tool (MCP_AGENT_TOOL_)** | | Settings for the `run_browser_agent` tool. | |
| | `MCP_AGENT_TOOL_MAX_STEPS` | Max steps per agent run. | `100` |
| | `MCP_AGENT_TOOL_MAX_ACTIONS_PER_STEP` | Max actions per agent step. | `5` |
| | `MCP_AGENT_TOOL_TOOL_CALLING_METHOD` | Method for tool invocation ('auto', 'json_schema', 'function_calling'). | `auto` |
| | `MCP_AGENT_TOOL_MAX_INPUT_TOKENS` | Max input tokens for LLM context. | `128000` |
| | `MCP_AGENT_TOOL_USE_VISION` | Enable vision capabilities (screenshot analysis). | `true` |
| | `MCP_AGENT_TOOL_HEADLESS` | Override `MCP_BROWSER_HEADLESS` for this tool (true/false/empty). | ` ` (uses general) |
| | `MCP_AGENT_TOOL_DISABLE_SECURITY` | Override `MCP_BROWSER_DISABLE_SECURITY` for this tool (true/false/empty). | ` ` (uses general) |
| | `MCP_AGENT_TOOL_ENABLE_RECORDING` | Enable Playwright video recording. | `false` |
| | `MCP_AGENT_TOOL_SAVE_RECORDING_PATH` | Optional: Path to save recordings. If not set, recording to file is disabled even if `ENABLE_RECORDING=true`. | ` ` (empty, recording disabled) |
| | `MCP_AGENT_TOOL_HISTORY_PATH` | Optional: Directory to save agent history JSON files. If not set, history saving is disabled. | ` ` (empty, history saving disabled) |
| **Research Tool (MCP_RESEARCH_TOOL_)** | | Settings for the `run_deep_research` tool. | |
| | `MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS` | Max parallel browser instances for deep research. | `3` |
| | `MCP_RESEARCH_TOOL_SAVE_DIR` | Optional: Base directory to save research artifacts. Task ID will be appended. If not set, operates in memory-only mode. | `None` |
| **Paths (MCP_PATHS_)** | | General path settings. | |
| | `MCP_PATHS_DOWNLOADS` | Optional: Directory for downloaded files. If not set, persistent downloads to a specific path are disabled. | ` ` (empty, downloads disabled) |
| **Server (MCP_SERVER_)** | | Server-specific settings. | |
| | `MCP_SERVER_LOG_FILE` | Path for the server log file. Empty for stdout. | ` ` (empty, logs to stdout) |
| | `MCP_SERVER_LOGGING_LEVEL` | Logging level (`DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`). | `ERROR` |
| | `MCP_SERVER_ANONYMIZED_TELEMETRY` | Enable/disable anonymized telemetry (`true`/`false`). | `true` |
| | `MCP_SERVER_MCP_CONFIG` | Optional: JSON string for MCP client config used by the internal controller. | `null` |
**Supported LLM Providers (`MCP_LLM_PROVIDER`):**
`openai`, `azure_openai`, `anthropic`, `google`, `mistral`, `ollama`, `deepseek`, `openrouter`, `alibaba`, `moonshot`, `unbound`
*(Refer to `.env.example` for a comprehensive list of all supported environment variables and their specific provider keys/endpoints.)*
## Connecting to Your Own Browser (CDP)
Instead of having the server launch and manage its own browser instance, you can connect it to a Chrome/Chromium browser that you launch and manage yourself.
**Steps:**
1. **Launch Chrome/Chromium with Remote Debugging Enabled:**
(Commands for macOS, Linux, Windows as previously listed, e.g., `google-chrome --remote-debugging-port=9222`)
2. **Configure Environment Variables:**
Set the following environment variables:
```dotenv
MCP_BROWSER_USE_OWN_BROWSER=true
MCP_BROWSER_CDP_URL=http://localhost:9222 # Use the same port
# Optional: MCP_BROWSER_USER_DATA_DIR=/path/to/your/profile
```
3. **Run the MCP Server or CLI:**
Start the server (`uv run mcp-server-browser-use`) or CLI (`mcp-browser-cli ...`) as usual.
**Important Considerations:**
* The browser launched with `--remote-debugging-port` must remain open.
* Settings like `MCP_BROWSER_HEADLESS` and `MCP_BROWSER_KEEP_OPEN` are ignored when `MCP_BROWSER_USE_OWN_BROWSER=true`.
## Development
```bash
# Install dev dependencies and sync project deps
uv sync --dev
# Install playwright browsers
uv run playwright install
# Run MCP server with debugger (Example connecting to own browser via CDP)
# 1. Launch Chrome: google-chrome --remote-debugging-port=9222 --user-data-dir="optional/path/to/user/profile"
# 2. Run inspector command with environment variables:
npx @modelcontextprotocol/inspector@latest \
-e MCP_LLM_GOOGLE_API_KEY=$GOOGLE_API_KEY \
-e MCP_LLM_PROVIDER=google \
-e MCP_LLM_MODEL_NAME=gemini-2.5-flash-preview-04-17 \
-e MCP_BROWSER_USE_OWN_BROWSER=true \
-e MCP_BROWSER_CDP_URL=http://localhost:9222 \
-e MCP_RESEARCH_TOOL_SAVE_DIR=./tmp/dev_research_output \
uv --directory . run mcp-server-browser-use
# Note: Change timeout in inspector's config panel if needed (default is 10 seconds)
# Run CLI example
# Create a .env file with your settings (including MCP_RESEARCH_TOOL_SAVE_DIR) or use environment variables
uv run mcp-browser-cli -e .env run-browser-agent "What is the title of example.com?"
uv run mcp-browser-cli -e .env run-deep-research "What is the best material for a pan for everyday use on amateur kitchen and dishwasher?"
```
## Troubleshooting
- **Configuration Error on Startup**: If the application fails to start with an error about a missing setting, ensure all **mandatory** environment variables (like `MCP_RESEARCH_TOOL_SAVE_DIR`) are set correctly in your environment or `.env` file.
- **Browser Conflicts**: If *not* using CDP (`MCP_BROWSER_USE_OWN_BROWSER=false`), ensure no conflicting Chrome instances are running with the same user data directory if `MCP_BROWSER_USER_DATA_DIR` is specified.
- **CDP Connection Issues**: If using `MCP_BROWSER_USE_OWN_BROWSER=true`:
* Verify Chrome was launched with `--remote-debugging-port`.
* Ensure the port in `MCP_BROWSER_CDP_URL` matches.
* Check firewalls and ensure the browser is running.
- **API Errors**: Double-check API keys (`MCP_LLM_<PROVIDER>_API_KEY` or `MCP_LLM_API_KEY`) and endpoints (e.g., `MCP_LLM_AZURE_OPENAI_ENDPOINT` for Azure).
- **Vision Issues**: Ensure `MCP_AGENT_TOOL_USE_VISION=true` and your LLM supports vision.
- **Dependency Problems**: Run `uv sync` and `uv run playwright install`.
- **File/Path Issues**:
* If optional features like history saving, tracing, or downloads are not working, ensure the corresponding path variables (`MCP_AGENT_TOOL_HISTORY_PATH`, `MCP_BROWSER_TRACE_PATH`, `MCP_PATHS_DOWNLOADS`) are set and the application has write permissions to those locations.
* For deep research, ensure `MCP_RESEARCH_TOOL_SAVE_DIR` is set to a valid, writable directory.
- **Logging**: Check the log file (`MCP_SERVER_LOG_FILE`, if set) or console output. Increase `MCP_SERVER_LOGGING_LEVEL` to `DEBUG` for more details. For CLI, use `--log-level DEBUG`.
## License
MIT - See [LICENSE](LICENSE) for details.
```
--------------------------------------------------------------------------------
/CLAUDE.md:
--------------------------------------------------------------------------------
```markdown
# Development Guidelines
This document contains critical information about working with this codebase. Follow these guidelines precisely.
## Core Development Rules
1. Package Management
- ONLY use uv, NEVER pip
- Installation: `uv add package`
- Running tools: `uv run tool`
- Upgrading: `uv add --dev package --upgrade-package package`
- FORBIDDEN: `uv pip install`, `@latest` syntax
2. Code Quality
- Type hints required for all code
- Public APIs must have docstrings
- Functions must be focused and small
- Follow existing patterns exactly
- Line length: 150 chars maximum
3. Testing Requirements
- Framework: `uv run pytest`
- Async testing: use anyio, not asyncio
- Coverage: test edge cases and errors
- New features require tests
- Bug fixes require regression tests
## Python Tools
## Code Formatting
1. Ruff
- Format: `uv run ruff format .`
- Check: `uv run ruff check .`
- Fix: `uv run ruff check . --fix`
- Critical issues:
- Line length (150 chars)
- Import sorting (I001)
- Unused imports
- Line wrapping:
- Strings: use parentheses
- Function calls: multi-line with proper indent
- Imports: split into multiple lines
2. Type Checking
- Tool: `uv run pyright`
- Requirements:
- Explicit None checks for Optional
- Type narrowing for strings
- Version warnings can be ignored if checks pass
3. Pre-commit
- Config: `.pre-commit-config.yaml`
- Runs: on git commit
- Tools: Prettier (YAML/JSON), Ruff (Python)
- Ruff updates:
- Check PyPI versions
- Update config rev
- Commit config first
## Error Resolution
1. CI Failures
- Fix order:
1. Formatting
2. Type errors
3. Linting
- Type errors:
- Get full line context
- Check Optional types
- Add type narrowing
- Verify function signatures
2. Common Issues
- Line length150 - Break strings with parentheses
- Multi-line function calls
- Split imports
- Types:
- Add None checks
- Narrow string types
- Match existing patterns
3. Best Practices
- Check git status before commits
- Run formatters before type checks
- Keep changes minimal
- Follow existing patterns
- Document public APIs
- Test thoroughly
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/agent/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/browser/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/controller/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/__init__.py:
--------------------------------------------------------------------------------
```python
from mcp_server_browser_use.server import main
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/__main__.py:
--------------------------------------------------------------------------------
```python
from mcp_server_browser_use.server import main
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/utils.py:
--------------------------------------------------------------------------------
```python
import base64
import os
import time
from pathlib import Path
from typing import Dict, Optional
import requests
import json
import uuid
def encode_image(img_path):
if not img_path:
return None
with open(img_path, "rb") as fin:
image_data = base64.b64encode(fin.read()).decode("utf-8")
return image_data
def get_latest_files(directory: str, file_types: list = ['.webm', '.zip']) -> Dict[str, Optional[str]]:
"""Get the latest recording and trace files"""
latest_files: Dict[str, Optional[str]] = {ext: None for ext in file_types}
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
return latest_files
for file_type in file_types:
try:
matches = list(Path(directory).rglob(f"*{file_type}"))
if matches:
latest = max(matches, key=lambda p: p.stat().st_mtime)
# Only return files that are complete (not being written)
if time.time() - latest.stat().st_mtime > 1.0:
latest_files[file_type] = str(latest)
except Exception as e:
print(f"Error getting latest {file_type} file: {e}")
return latest_files
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "mcp_server_browser_use"
version = "0.1.8"
description = "MCP server for browser-use"
readme = "README.md"
requires-python = ">=3.11"
authors = [{ name = "Igor Tarasenko" }]
license = { text = "MIT" }
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Operating System :: OS Independent",
]
dependencies = [
"pydantic-settings>=2.0.0",
"mcp>=1.6.0",
"typer>=0.12.0",
"browser-use==0.1.41",
"pyperclip==1.9.0",
"json-repair",
"langchain-mistralai==0.2.4",
"MainContentExtractor==0.0.4",
"langchain-ibm==0.3.10",
"langchain_mcp_adapters==0.0.9",
"langgraph==0.3.34",
"langchain-community",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/mcp_server_browser_use"]
[project.scripts]
mcp-server-browser-use = "mcp_server_browser_use.server:main"
mcp-browser-cli = "mcp_server_browser_use.cli:app"
[tool.pyright]
include = ["src/mcp_server_browser_use"]
venvPath = "."
venv = ".venv"
[tool.ruff.lint]
select = ["E", "F", "I"]
ignore = []
[tool.ruff]
line-length = 150
target-version = "py311"
[tool.uv]
dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/config.py:
--------------------------------------------------------------------------------
```python
PROVIDER_DISPLAY_NAMES = {
"openai": "OpenAI",
"azure_openai": "Azure OpenAI",
"anthropic": "Anthropic",
"deepseek": "DeepSeek",
"google": "Google",
"alibaba": "Alibaba",
"moonshot": "MoonShot",
"unbound": "Unbound AI",
"ibm": "IBM"
}
# Predefined model names for common providers
model_names = {
"anthropic": ["claude-3-5-sonnet-20241022", "claude-3-5-sonnet-20240620", "claude-3-opus-20240229"],
"openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo", "o3-mini"],
"deepseek": ["deepseek-chat", "deepseek-reasoner"],
"google": ["gemini-2.0-flash", "gemini-2.0-flash-thinking-exp", "gemini-1.5-flash-latest",
"gemini-1.5-flash-8b-latest", "gemini-2.0-flash-thinking-exp-01-21", "gemini-2.0-pro-exp-02-05",
"gemini-2.5-pro-preview-03-25", "gemini-2.5-flash-preview-04-17"],
"ollama": ["qwen2.5:7b", "qwen2.5:14b", "qwen2.5:32b", "qwen2.5-coder:14b", "qwen2.5-coder:32b", "llama2:7b",
"deepseek-r1:14b", "deepseek-r1:32b"],
"azure_openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"],
"mistral": ["pixtral-large-latest", "mistral-large-latest", "mistral-small-latest", "ministral-8b-latest"],
"alibaba": ["qwen-plus", "qwen-max", "qwen-vl-max", "qwen-vl-plus", "qwen-turbo", "qwen-long"],
"moonshot": ["moonshot-v1-32k-vision-preview", "moonshot-v1-8k-vision-preview"],
"unbound": ["gemini-2.0-flash", "gpt-4o-mini", "gpt-4o", "gpt-4.5-preview"],
"siliconflow": [
"deepseek-ai/DeepSeek-R1",
"deepseek-ai/DeepSeek-V3",
"deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
"deepseek-ai/DeepSeek-R1-Distill-Qwen-14B",
"deepseek-ai/DeepSeek-R1-Distill-Qwen-7B",
"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
"deepseek-ai/DeepSeek-V2.5",
"deepseek-ai/deepseek-vl2",
"Qwen/Qwen2.5-72B-Instruct-128K",
"Qwen/Qwen2.5-72B-Instruct",
"Qwen/Qwen2.5-32B-Instruct",
"Qwen/Qwen2.5-14B-Instruct",
"Qwen/Qwen2.5-7B-Instruct",
"Qwen/Qwen2.5-Coder-32B-Instruct",
"Qwen/Qwen2.5-Coder-7B-Instruct",
"Qwen/Qwen2-7B-Instruct",
"Qwen/Qwen2-1.5B-Instruct",
"Qwen/QwQ-32B-Preview",
"Qwen/Qwen2-VL-72B-Instruct",
"Qwen/Qwen2.5-VL-32B-Instruct",
"Qwen/Qwen2.5-VL-72B-Instruct",
"TeleAI/TeleChat2",
"THUDM/glm-4-9b-chat",
"Vendor-A/Qwen/Qwen2.5-72B-Instruct",
"internlm/internlm2_5-7b-chat",
"internlm/internlm2_5-20b-chat",
"Pro/Qwen/Qwen2.5-7B-Instruct",
"Pro/Qwen/Qwen2-7B-Instruct",
"Pro/Qwen/Qwen2-1.5B-Instruct",
"Pro/THUDM/chatglm3-6b",
"Pro/THUDM/glm-4-9b-chat",
],
"ibm": ["ibm/granite-vision-3.1-2b-preview", "meta-llama/llama-4-maverick-17b-128e-instruct-fp8",
"meta-llama/llama-3-2-90b-vision-instruct"]
}
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/browser/custom_context.py:
--------------------------------------------------------------------------------
```python
import json
import logging
import os
from browser_use.browser.browser import Browser, IN_DOCKER
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from playwright.async_api import Browser as PlaywrightBrowser
from playwright.async_api import BrowserContext as PlaywrightBrowserContext
from typing import Optional
from browser_use.browser.context import BrowserContextState
logger = logging.getLogger(__name__)
class CustomBrowserContextConfig(BrowserContextConfig):
force_new_context: bool = False # force to create new context
class CustomBrowserContext(BrowserContext):
def __init__(
self,
browser: 'Browser',
config: BrowserContextConfig | None = None,
state: Optional[BrowserContextState] = None,
):
super(CustomBrowserContext, self).__init__(browser=browser, config=config, state=state)
async def _create_context(self, browser: PlaywrightBrowser):
"""Creates a new browser context with anti-detection measures and loads cookies if available."""
if not self.config.force_new_context and self.browser.config.cdp_url and len(browser.contexts) > 0:
context = browser.contexts[0]
elif not self.config.force_new_context and self.browser.config.browser_binary_path and len(
browser.contexts) > 0:
# Connect to existing Chrome instance instead of creating new one
context = browser.contexts[0]
else:
# Original code for creating new context
context = await browser.new_context(
no_viewport=True,
user_agent=self.config.user_agent,
java_script_enabled=True,
bypass_csp=self.config.disable_security,
ignore_https_errors=self.config.disable_security,
record_video_dir=self.config.save_recording_path,
record_video_size=self.config.browser_window_size.model_dump(),
record_har_path=self.config.save_har_path,
locale=self.config.locale,
http_credentials=self.config.http_credentials,
is_mobile=self.config.is_mobile,
has_touch=self.config.has_touch,
geolocation=self.config.geolocation,
permissions=self.config.permissions,
timezone_id=self.config.timezone_id,
)
if self.config.trace_path:
await context.tracing.start(screenshots=True, snapshots=True, sources=True)
# Load cookies if they exist
if self.config.cookies_file and os.path.exists(self.config.cookies_file):
with open(self.config.cookies_file, 'r') as f:
try:
cookies = json.load(f)
valid_same_site_values = ['Strict', 'Lax', 'None']
for cookie in cookies:
if 'sameSite' in cookie:
if cookie['sameSite'] not in valid_same_site_values:
logger.warning(
f"Fixed invalid sameSite value '{cookie['sameSite']}' to 'None' for cookie {cookie.get('name')}"
)
cookie['sameSite'] = 'None'
logger.info(f'🍪 Loaded {len(cookies)} cookies from {self.config.cookies_file}')
await context.add_cookies(cookies)
except json.JSONDecodeError as e:
logger.error(f'Failed to parse cookies file: {str(e)}')
# Expose anti-detection scripts
await context.add_init_script(
"""
// Webdriver property
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// Languages
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US']
});
// Plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
// Chrome runtime
window.chrome = { runtime: {} };
// Permissions
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
(function () {
const originalAttachShadow = Element.prototype.attachShadow;
Element.prototype.attachShadow = function attachShadow(options) {
return originalAttachShadow.call(this, { ...options, mode: "open" });
};
})();
"""
)
return context
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/browser/custom_browser.py:
--------------------------------------------------------------------------------
```python
import asyncio
import gc
import pdb
from playwright.async_api import Browser as PlaywrightBrowser
from playwright.async_api import (
BrowserContext as PlaywrightBrowserContext,
)
from playwright.async_api import (
Playwright,
async_playwright,
)
from browser_use.browser.browser import Browser, IN_DOCKER
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from playwright.async_api import BrowserContext as PlaywrightBrowserContext
import logging
from browser_use.browser.chrome import (
CHROME_ARGS,
CHROME_DETERMINISTIC_RENDERING_ARGS,
CHROME_DISABLE_SECURITY_ARGS,
CHROME_DOCKER_ARGS,
CHROME_HEADLESS_ARGS,
)
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from browser_use.browser.utils.screen_resolution import get_screen_resolution, get_window_adjustments
from browser_use.utils import time_execution_async
import socket
from .custom_context import CustomBrowserContext, CustomBrowserContextConfig
logger = logging.getLogger(__name__)
class CustomBrowser(Browser):
async def new_context(self, config: CustomBrowserContextConfig | None = None) -> CustomBrowserContext:
"""Create a browser context"""
browser_config = self.config.model_dump() if self.config else {}
context_config = config.model_dump() if config else {}
merged_config = {**browser_config, **context_config}
return CustomBrowserContext(config=CustomBrowserContextConfig(**merged_config), browser=self)
async def _setup_builtin_browser(self, playwright: Playwright) -> PlaywrightBrowser:
"""Sets up and returns a Playwright Browser instance with anti-detection measures."""
assert self.config.browser_binary_path is None, 'browser_binary_path should be None if trying to use the builtin browsers'
if self.config.headless:
screen_size = {'width': 1920, 'height': 1080}
offset_x, offset_y = 0, 0
else:
screen_size = get_screen_resolution()
offset_x, offset_y = get_window_adjustments()
chrome_args = {
*CHROME_ARGS,
*(CHROME_DOCKER_ARGS if IN_DOCKER else []),
*(CHROME_HEADLESS_ARGS if self.config.headless else []),
*(CHROME_DISABLE_SECURITY_ARGS if self.config.disable_security else []),
*(CHROME_DETERMINISTIC_RENDERING_ARGS if self.config.deterministic_rendering else []),
f'--window-position={offset_x},{offset_y}',
*self.config.extra_browser_args,
}
contain_window_size = False
for arg in self.config.extra_browser_args:
if "--window-size" in arg:
contain_window_size = True
break
if not contain_window_size:
chrome_args.add(f'--window-size={screen_size["width"]},{screen_size["height"]}')
# check if port 9222 is already taken, if so remove the remote-debugging-port arg to prevent conflicts
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(('localhost', 9222)) == 0:
chrome_args.remove('--remote-debugging-port=9222')
browser_class = getattr(playwright, self.config.browser_class)
args = {
'chromium': list(chrome_args),
'firefox': [
*{
'-no-remote',
*self.config.extra_browser_args,
}
],
'webkit': [
*{
'--no-startup-window',
*self.config.extra_browser_args,
}
],
}
browser = await browser_class.launch(
headless=self.config.headless,
args=args[self.config.browser_class],
proxy=self.config.proxy.model_dump() if self.config.proxy else None,
handle_sigterm=False,
handle_sigint=False,
)
return browser
async def _close_without_httpxclients(self):
if self.config.keep_alive:
return
try:
if self.playwright_browser:
await self.playwright_browser.close()
del self.playwright_browser
if self.playwright:
await self.playwright.stop()
del self.playwright
if chrome_proc := getattr(self, '_chrome_subprocess', None):
try:
# always kill all children processes, otherwise chrome leaves a bunch of zombie processes
for proc in chrome_proc.children(recursive=True):
proc.kill()
chrome_proc.kill()
except Exception as e:
logger.debug(f'Failed to terminate chrome subprocess: {e}')
except Exception as e:
logger.debug(f'Failed to close browser properly: {e}')
finally:
self.playwright_browser = None
self.playwright = None
self._chrome_subprocess = None
gc.collect()
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/agent/browser_use/browser_use_agent.py:
--------------------------------------------------------------------------------
```python
from __future__ import annotations
import asyncio
import gc
import inspect
import json
import logging
import os
import re
import time
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, Generic, List, Optional, TypeVar, Union
from dotenv import load_dotenv
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
BaseMessage,
HumanMessage,
SystemMessage,
)
# from lmnr.sdk.decorators import observe
from pydantic import BaseModel, ValidationError
from browser_use.agent.gif import create_history_gif
from browser_use.agent.memory.service import Memory, MemorySettings
from browser_use.agent.message_manager.service import MessageManager, MessageManagerSettings
from browser_use.agent.message_manager.utils import convert_input_messages, extract_json_from_model_output, save_conversation
from browser_use.agent.prompts import AgentMessagePrompt, PlannerPrompt, SystemPrompt
from browser_use.agent.views import (
REQUIRED_LLM_API_ENV_VARS,
ActionResult,
AgentError,
AgentHistory,
AgentHistoryList,
AgentOutput,
AgentSettings,
AgentState,
AgentStepInfo,
StepMetadata,
ToolCallingMethod,
)
from browser_use.browser.browser import Browser
from browser_use.browser.context import BrowserContext
from browser_use.browser.views import BrowserState, BrowserStateHistory
from browser_use.controller.registry.views import ActionModel
from browser_use.controller.service import Controller
from browser_use.dom.history_tree_processor.service import (
DOMHistoryElement,
HistoryTreeProcessor,
)
from browser_use.exceptions import LLMException
from browser_use.telemetry.service import ProductTelemetry
from browser_use.telemetry.views import (
AgentEndTelemetryEvent,
AgentRunTelemetryEvent,
AgentStepTelemetryEvent,
)
from browser_use.utils import check_env_variables, time_execution_async, time_execution_sync
from browser_use.agent.service import Agent, AgentHookFunc
load_dotenv()
logger = logging.getLogger(__name__)
SKIP_LLM_API_KEY_VERIFICATION = os.environ.get('SKIP_LLM_API_KEY_VERIFICATION', 'false').lower()[0] in 'ty1'
class BrowserUseAgent(Agent):
@time_execution_async('--run (agent)')
async def run(
self, max_steps: int = 100, on_step_start: AgentHookFunc | None = None,
on_step_end: AgentHookFunc | None = None
) -> AgentHistoryList:
"""Execute the task with maximum number of steps"""
loop = asyncio.get_event_loop()
# Set up the Ctrl+C signal handler with callbacks specific to this agent
from browser_use.utils import SignalHandler
signal_handler = SignalHandler(
loop=loop,
pause_callback=self.pause,
resume_callback=self.resume,
custom_exit_callback=None, # No special cleanup needed on forced exit
exit_on_second_int=True,
)
signal_handler.register()
# Wait for verification task to complete if it exists
if hasattr(self, '_verification_task') and self._verification_task and not self._verification_task.done():
try:
await self._verification_task
except Exception:
# Error already logged in the task
pass
try:
self._log_agent_run()
# Execute initial actions if provided
if self.initial_actions:
result = await self.multi_act(self.initial_actions, check_for_new_elements=False)
self.state.last_result = result
for step in range(max_steps):
# Check if waiting for user input after Ctrl+C
while self.state.paused:
await asyncio.sleep(0.5)
if self.state.stopped:
break
# Check if we should stop due to too many failures
if self.state.consecutive_failures >= self.settings.max_failures:
logger.error(f'❌ Stopping due to {self.settings.max_failures} consecutive failures')
break
# Check control flags before each step
if self.state.stopped:
logger.info('Agent stopped')
break
while self.state.paused:
await asyncio.sleep(0.2) # Small delay to prevent CPU spinning
if self.state.stopped: # Allow stopping while paused
break
if on_step_start is not None:
await on_step_start(self)
step_info = AgentStepInfo(step_number=step, max_steps=max_steps)
await self.step(step_info)
if on_step_end is not None:
await on_step_end(self)
if self.state.history.is_done():
if self.settings.validate_output and step < max_steps - 1:
if not await self._validate_output():
continue
await self.log_completion()
break
else:
logger.info('❌ Failed to complete task in maximum steps')
return self.state.history
except KeyboardInterrupt:
# Already handled by our signal handler, but catch any direct KeyboardInterrupt as well
logger.info('Got KeyboardInterrupt during execution, returning current history')
return self.state.history
finally:
# Unregister signal handlers before cleanup
signal_handler.unregister()
self.telemetry.capture(
AgentEndTelemetryEvent(
agent_id=self.state.agent_id,
is_done=self.state.history.is_done(),
success=self.state.history.is_successful(),
steps=self.state.n_steps,
max_steps_reached=self.state.n_steps >= max_steps,
errors=self.state.history.errors(),
total_input_tokens=self.state.history.total_input_tokens(),
total_duration_seconds=self.state.history.total_duration_seconds(),
)
)
await self.close()
if self.settings.generate_gif:
output_path: str = 'agent_history.gif'
if isinstance(self.settings.generate_gif, str):
output_path = self.settings.generate_gif
create_history_gif(task=self.task, history=self.state.history, output_path=output_path)
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/controller/custom_controller.py:
--------------------------------------------------------------------------------
```python
import pdb
import pyperclip
from typing import Optional, Type, Callable, Dict, Any, Union, Awaitable, TypeVar
from pydantic import BaseModel
from browser_use.agent.views import ActionResult
from browser_use.browser.context import BrowserContext
from browser_use.controller.service import Controller, DoneAction
from browser_use.controller.registry.service import Registry, RegisteredAction
from main_content_extractor import MainContentExtractor
from browser_use.controller.views import (
ClickElementAction,
DoneAction,
ExtractPageContentAction,
GoToUrlAction,
InputTextAction,
OpenTabAction,
ScrollAction,
SearchGoogleAction,
SendKeysAction,
SwitchTabAction,
)
import logging
import inspect
import asyncio
import os
from langchain_core.language_models.chat_models import BaseChatModel
from browser_use.agent.views import ActionModel, ActionResult
from ..utils.mcp_client import create_tool_param_model, setup_mcp_client_and_tools
from browser_use.utils import time_execution_sync
logger = logging.getLogger(__name__)
Context = TypeVar('Context')
class CustomController(Controller):
def __init__(self, exclude_actions: list[str] = [],
output_model: Optional[Type[BaseModel]] = None,
ask_assistant_callback: Optional[Union[Callable[[str, BrowserContext], Dict[str, Any]], Callable[
[str, BrowserContext], Awaitable[Dict[str, Any]]]]] = None,
):
super().__init__(exclude_actions=exclude_actions, output_model=output_model)
self._register_custom_actions()
self.ask_assistant_callback = ask_assistant_callback
self.mcp_client = None
self.mcp_server_config = None
def _register_custom_actions(self):
"""Register all custom browser actions"""
@self.registry.action(
"When executing tasks, prioritize autonomous completion. However, if you encounter a definitive blocker "
"that prevents you from proceeding independently – such as needing credentials you don't possess, "
"requiring subjective human judgment, needing a physical action performed, encountering complex CAPTCHAs, "
"or facing limitations in your capabilities – you must request human assistance."
)
async def ask_for_assistant(query: str, browser: BrowserContext):
if self.ask_assistant_callback:
if inspect.iscoroutinefunction(self.ask_assistant_callback):
user_response = await self.ask_assistant_callback(query, browser)
else:
user_response = self.ask_assistant_callback(query, browser)
msg = f"AI ask: {query}. User response: {user_response['response']}"
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
else:
return ActionResult(extracted_content="Human cannot help you. Please try another way.",
include_in_memory=True)
@self.registry.action(
'Upload file to interactive element with file path ',
)
async def upload_file(index: int, path: str, browser: BrowserContext, available_file_paths: list[str]):
if path not in available_file_paths:
return ActionResult(error=f'File path {path} is not available')
if not os.path.exists(path):
return ActionResult(error=f'File {path} does not exist')
dom_el = await browser.get_dom_element_by_index(index)
file_upload_dom_el = dom_el.get_file_upload_element()
if file_upload_dom_el is None:
msg = f'No file upload element found at index {index}'
logger.info(msg)
return ActionResult(error=msg)
file_upload_el = await browser.get_locate_element(file_upload_dom_el)
if file_upload_el is None:
msg = f'No file upload element found at index {index}'
logger.info(msg)
return ActionResult(error=msg)
try:
await file_upload_el.set_input_files(path)
msg = f'Successfully uploaded file to index {index}'
logger.info(msg)
return ActionResult(extracted_content=msg, include_in_memory=True)
except Exception as e:
msg = f'Failed to upload file to index {index}: {str(e)}'
logger.info(msg)
return ActionResult(error=msg)
@time_execution_sync('--act')
async def act(
self,
action: ActionModel,
browser_context: Optional[BrowserContext] = None,
#
page_extraction_llm: Optional[BaseChatModel] = None,
sensitive_data: Optional[Dict[str, str]] = None,
available_file_paths: Optional[list[str]] = None,
#
context: Context | None = None,
) -> ActionResult:
"""Execute an action"""
try:
for action_name, params in action.model_dump(exclude_unset=True).items():
if params is not None:
if action_name.startswith("mcp"):
# this is a mcp tool
logger.debug(f"Invoke MCP tool: {action_name}")
mcp_tool = self.registry.registry.actions.get(action_name).function
result = await mcp_tool.ainvoke(params)
else:
result = await self.registry.execute_action(
action_name,
params,
browser=browser_context,
page_extraction_llm=page_extraction_llm,
sensitive_data=sensitive_data,
available_file_paths=available_file_paths,
context=context,
)
if isinstance(result, str):
return ActionResult(extracted_content=result)
elif isinstance(result, ActionResult):
return result
elif result is None:
return ActionResult()
else:
raise ValueError(f'Invalid action result type: {type(result)} of {result}')
return ActionResult()
except Exception as e:
raise e
async def setup_mcp_client(self, mcp_server_config: Optional[Dict[str, Any]] = None):
self.mcp_server_config = mcp_server_config
if self.mcp_server_config:
self.mcp_client = await setup_mcp_client_and_tools(self.mcp_server_config)
self.register_mcp_tools()
def register_mcp_tools(self):
"""
Register the MCP tools used by this controller.
"""
if self.mcp_client:
for server_name in self.mcp_client.server_name_to_tools:
for tool in self.mcp_client.server_name_to_tools[server_name]:
tool_name = f"mcp.{server_name}.{tool.name}"
self.registry.registry.actions[tool_name] = RegisteredAction(
name=tool_name,
description=tool.description,
function=tool,
param_model=create_tool_param_model(tool),
)
logger.info(f"Add mcp tool: {tool_name}")
async def close_mcp_client(self):
if self.mcp_client:
await self.mcp_client.__aexit__(None, None, None)
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/mcp_client.py:
--------------------------------------------------------------------------------
```python
import os
import asyncio
import base64
import pdb
from typing import List, Tuple, Optional
from langchain_core.tools import BaseTool
from langchain_mcp_adapters.client import MultiServerMCPClient
import base64
import json
import logging
from typing import Optional, Dict, Any, Type
from langchain_core.tools import BaseTool
from pydantic.v1 import BaseModel, Field
from langchain_core.runnables import RunnableConfig
from pydantic import BaseModel, Field, create_model
from typing import Type, Dict, Any, Optional, get_type_hints, List, Union, Annotated, Set
from pydantic import BaseModel, ConfigDict, create_model, Field
from langchain.tools import BaseTool
import inspect
from datetime import datetime, date, time
import uuid
from enum import Enum
import inspect
from browser_use.controller.registry.views import ActionModel
from typing import Type, Dict, Any, Optional, get_type_hints
logger = logging.getLogger(__name__)
async def setup_mcp_client_and_tools(mcp_server_config: Dict[str, Any]) -> Optional[MultiServerMCPClient]:
"""
Initializes the MultiServerMCPClient, connects to servers, fetches tools,
filters them, and returns a flat list of usable tools and the client instance.
Returns:
A tuple containing:
- list[BaseTool]: The filtered list of usable LangChain tools.
- MultiServerMCPClient | None: The initialized and started client instance, or None on failure.
"""
logger.info("Initializing MultiServerMCPClient...")
if not mcp_server_config:
logger.error("No MCP server configuration provided.")
return None
try:
if "mcpServers" in mcp_server_config:
mcp_server_config = mcp_server_config["mcpServers"]
client = MultiServerMCPClient(mcp_server_config)
await client.__aenter__()
return client
except Exception as e:
logger.error(f"Failed to setup MCP client or fetch tools: {e}", exc_info=True)
return None
def create_tool_param_model(tool: BaseTool) -> Type[BaseModel]:
"""Creates a Pydantic model from a LangChain tool's schema"""
# Get tool schema information
json_schema = tool.args_schema
tool_name = tool.name
# If the tool already has a schema defined, convert it to a new param_model
if json_schema is not None:
# Create new parameter model
params = {}
# Process properties if they exist
if 'properties' in json_schema:
# Find required fields
required_fields: Set[str] = set(json_schema.get('required', []))
for prop_name, prop_details in json_schema['properties'].items():
field_type = resolve_type(prop_details, f"{tool_name}_{prop_name}")
# Check if parameter is required
is_required = prop_name in required_fields
# Get default value and description
default_value = prop_details.get('default', ... if is_required else None)
description = prop_details.get('description', '')
# Add field constraints
field_kwargs = {'default': default_value}
if description:
field_kwargs['description'] = description
# Add additional constraints if present
if 'minimum' in prop_details:
field_kwargs['ge'] = prop_details['minimum']
if 'maximum' in prop_details:
field_kwargs['le'] = prop_details['maximum']
if 'minLength' in prop_details:
field_kwargs['min_length'] = prop_details['minLength']
if 'maxLength' in prop_details:
field_kwargs['max_length'] = prop_details['maxLength']
if 'pattern' in prop_details:
field_kwargs['pattern'] = prop_details['pattern']
# Add to parameters dictionary
params[prop_name] = (field_type, Field(**field_kwargs))
return create_model(
f'{tool_name}_parameters',
__base__=ActionModel,
**params, # type: ignore
)
# If no schema is defined, extract parameters from the _run method
run_method = tool._run
sig = inspect.signature(run_method)
# Get type hints for better type information
try:
type_hints = get_type_hints(run_method)
except Exception:
type_hints = {}
params = {}
for name, param in sig.parameters.items():
# Skip 'self' parameter and any other parameters you want to exclude
if name == 'self':
continue
# Get annotation from type hints if available, otherwise from signature
annotation = type_hints.get(name, param.annotation)
if annotation == inspect.Parameter.empty:
annotation = Any
# Use default value if available, otherwise make it required
if param.default != param.empty:
params[name] = (annotation, param.default)
else:
params[name] = (annotation, ...)
return create_model(
f'{tool_name}_parameters',
__base__=ActionModel,
**params, # type: ignore
)
def resolve_type(prop_details: Dict[str, Any], prefix: str = "") -> Any:
"""Recursively resolves JSON schema type to Python/Pydantic type"""
# Handle reference types
if '$ref' in prop_details:
# In a real application, reference resolution would be needed
return Any
# Basic type mapping
type_mapping = {
'string': str,
'integer': int,
'number': float,
'boolean': bool,
'array': List,
'object': Dict,
'null': type(None),
}
# Handle formatted strings
if prop_details.get('type') == 'string' and 'format' in prop_details:
format_mapping = {
'date-time': datetime,
'date': date,
'time': time,
'email': str,
'uri': str,
'url': str,
'uuid': uuid.UUID,
'binary': bytes,
}
return format_mapping.get(prop_details['format'], str)
# Handle enum types
if 'enum' in prop_details:
enum_values = prop_details['enum']
# Create dynamic enum class with safe names
enum_dict = {}
for i, v in enumerate(enum_values):
# Ensure enum names are valid Python identifiers
if isinstance(v, str):
key = v.upper().replace(' ', '_').replace('-', '_')
if not key.isidentifier():
key = f"VALUE_{i}"
else:
key = f"VALUE_{i}"
enum_dict[key] = v
# Only create enum if we have values
if enum_dict:
return Enum(f"{prefix}_Enum", enum_dict)
return str # Fallback
# Handle array types
if prop_details.get('type') == 'array' and 'items' in prop_details:
item_type = resolve_type(prop_details['items'], f"{prefix}_item")
return List[item_type] # type: ignore
# Handle object types with properties
if prop_details.get('type') == 'object' and 'properties' in prop_details:
nested_params = {}
for nested_name, nested_details in prop_details['properties'].items():
nested_type = resolve_type(nested_details, f"{prefix}_{nested_name}")
# Get required field info
required_fields = prop_details.get('required', [])
is_required = nested_name in required_fields
default_value = nested_details.get('default', ... if is_required else None)
description = nested_details.get('description', '')
field_kwargs = {'default': default_value}
if description:
field_kwargs['description'] = description
nested_params[nested_name] = (nested_type, Field(**field_kwargs))
# Create nested model
nested_model = create_model(f"{prefix}_Model", **nested_params)
return nested_model
# Handle union types (oneOf, anyOf)
if 'oneOf' in prop_details or 'anyOf' in prop_details:
union_schema = prop_details.get('oneOf') or prop_details.get('anyOf')
union_types = []
for i, t in enumerate(union_schema):
union_types.append(resolve_type(t, f"{prefix}_{i}"))
if union_types:
return Union.__getitem__(tuple(union_types)) # type: ignore
return Any
# Handle allOf (intersection types)
if 'allOf' in prop_details:
nested_params = {}
for i, schema_part in enumerate(prop_details['allOf']):
if 'properties' in schema_part:
for nested_name, nested_details in schema_part['properties'].items():
nested_type = resolve_type(nested_details, f"{prefix}_allOf_{i}_{nested_name}")
# Check if required
required_fields = schema_part.get('required', [])
is_required = nested_name in required_fields
nested_params[nested_name] = (nested_type, ... if is_required else None)
# Create composite model
if nested_params:
composite_model = create_model(f"{prefix}_CompositeModel", **nested_params)
return composite_model
return Dict
# Default to basic types
schema_type = prop_details.get('type', 'string')
if isinstance(schema_type, list):
# Handle multiple types (e.g., ["string", "null"])
non_null_types = [t for t in schema_type if t != 'null']
if non_null_types:
primary_type = type_mapping.get(non_null_types[0], Any)
if 'null' in schema_type:
return Optional[primary_type] # type: ignore
return primary_type
return Any
return type_mapping.get(schema_type, Any)
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/llm_provider.py:
--------------------------------------------------------------------------------
```python
from openai import OpenAI
import pdb
from langchain_openai import ChatOpenAI
from langchain_core.globals import get_llm_cache
from langchain_core.language_models.base import (
BaseLanguageModel,
LangSmithParams,
LanguageModelInput,
)
import os
from langchain_core.load import dumpd, dumps
from langchain_core.messages import (
AIMessage,
SystemMessage,
AnyMessage,
BaseMessage,
BaseMessageChunk,
HumanMessage,
convert_to_messages,
message_chunk_to_message,
)
from langchain_core.outputs import (
ChatGeneration,
ChatGenerationChunk,
ChatResult,
LLMResult,
RunInfo,
)
from langchain_ollama import ChatOllama
from langchain_core.output_parsers.base import OutputParserLike
from langchain_core.runnables import Runnable, RunnableConfig
from langchain_core.tools import BaseTool
from typing import (
TYPE_CHECKING,
Any,
Callable,
Literal,
Optional,
Union,
cast, List,
)
from langchain_anthropic import ChatAnthropic
from langchain_mistralai import ChatMistralAI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_ollama import ChatOllama
from langchain_openai import AzureChatOpenAI, ChatOpenAI
from langchain_ibm import ChatWatsonx
from langchain_aws import ChatBedrock
from pydantic import SecretStr
from ..utils import config
class DeepSeekR1ChatOpenAI(ChatOpenAI):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.client = OpenAI(
base_url=kwargs.get("base_url"),
api_key=kwargs.get("api_key")
)
async def ainvoke(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
*,
stop: Optional[list[str]] = None,
**kwargs: Any,
) -> AIMessage:
message_history = []
for input_ in input:
if isinstance(input_, SystemMessage):
message_history.append({"role": "system", "content": input_.content})
elif isinstance(input_, AIMessage):
message_history.append({"role": "assistant", "content": input_.content})
else:
message_history.append({"role": "user", "content": input_.content})
response = self.client.chat.completions.create(
model=self.model_name,
messages=message_history
)
reasoning_content = response.choices[0].message.reasoning_content
content = response.choices[0].message.content
return AIMessage(content=content, reasoning_content=reasoning_content)
def invoke(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
*,
stop: Optional[list[str]] = None,
**kwargs: Any,
) -> AIMessage:
message_history = []
for input_ in input:
if isinstance(input_, SystemMessage):
message_history.append({"role": "system", "content": input_.content})
elif isinstance(input_, AIMessage):
message_history.append({"role": "assistant", "content": input_.content})
else:
message_history.append({"role": "user", "content": input_.content})
response = self.client.chat.completions.create(
model=self.model_name,
messages=message_history
)
reasoning_content = response.choices[0].message.reasoning_content
content = response.choices[0].message.content
return AIMessage(content=content, reasoning_content=reasoning_content)
class DeepSeekR1ChatOllama(ChatOllama):
async def ainvoke(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
*,
stop: Optional[list[str]] = None,
**kwargs: Any,
) -> AIMessage:
org_ai_message = await super().ainvoke(input=input)
org_content = org_ai_message.content
reasoning_content = org_content.split("</think>")[0].replace("<think>", "")
content = org_content.split("</think>")[1]
if "**JSON Response:**" in content:
content = content.split("**JSON Response:**")[-1]
return AIMessage(content=content, reasoning_content=reasoning_content)
def invoke(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
*,
stop: Optional[list[str]] = None,
**kwargs: Any,
) -> AIMessage:
org_ai_message = super().invoke(input=input)
org_content = org_ai_message.content
reasoning_content = org_content.split("</think>")[0].replace("<think>", "")
content = org_content.split("</think>")[1]
if "**JSON Response:**" in content:
content = content.split("**JSON Response:**")[-1]
return AIMessage(content=content, reasoning_content=reasoning_content)
def get_llm_model(provider: str, **kwargs):
"""
Get LLM model
:param provider: LLM provider
:param kwargs:
:return:
"""
if provider not in ["ollama", "bedrock"]:
env_var = f"{provider.upper()}_API_KEY"
api_key = kwargs.get("api_key", "") or os.getenv(env_var, "")
if not api_key:
provider_display = config.PROVIDER_DISPLAY_NAMES.get(provider, provider.upper())
error_msg = f"💥 {provider_display} API key not found! 🔑 Please set the `{env_var}` environment variable or provide it in the UI."
raise ValueError(error_msg)
kwargs["api_key"] = api_key
if provider == "anthropic":
if not kwargs.get("base_url", ""):
base_url = "https://api.anthropic.com"
else:
base_url = kwargs.get("base_url")
return ChatAnthropic(
model=kwargs.get("model_name", "claude-3-5-sonnet-20241022"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == 'mistral':
if not kwargs.get("base_url", ""):
base_url = os.getenv("MISTRAL_ENDPOINT", "https://api.mistral.ai/v1")
else:
base_url = kwargs.get("base_url")
if not kwargs.get("api_key", ""):
api_key = os.getenv("MISTRAL_API_KEY", "")
else:
api_key = kwargs.get("api_key")
return ChatMistralAI(
model=kwargs.get("model_name", "mistral-large-latest"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "openai":
if not kwargs.get("base_url", ""):
base_url = os.getenv("OPENAI_ENDPOINT", "https://api.openai.com/v1")
else:
base_url = kwargs.get("base_url")
return ChatOpenAI(
model=kwargs.get("model_name", "gpt-4o"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "deepseek":
if not kwargs.get("base_url", ""):
base_url = os.getenv("DEEPSEEK_ENDPOINT", "")
else:
base_url = kwargs.get("base_url")
if kwargs.get("model_name", "deepseek-chat") == "deepseek-reasoner":
return DeepSeekR1ChatOpenAI(
model=kwargs.get("model_name", "deepseek-reasoner"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
else:
return ChatOpenAI(
model=kwargs.get("model_name", "deepseek-chat"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "google":
return ChatGoogleGenerativeAI(
model=kwargs.get("model_name", "gemini-2.0-flash-exp"),
temperature=kwargs.get("temperature", 0.0),
api_key=api_key,
)
elif provider == "ollama":
if not kwargs.get("base_url", ""):
base_url = os.getenv("OLLAMA_ENDPOINT", "http://localhost:11434")
else:
base_url = kwargs.get("base_url")
if "deepseek-r1" in kwargs.get("model_name", "qwen2.5:7b"):
return DeepSeekR1ChatOllama(
model=kwargs.get("model_name", "deepseek-r1:14b"),
temperature=kwargs.get("temperature", 0.0),
num_ctx=kwargs.get("num_ctx", 32000),
base_url=base_url,
)
else:
return ChatOllama(
model=kwargs.get("model_name", "qwen2.5:7b"),
temperature=kwargs.get("temperature", 0.0),
num_ctx=kwargs.get("num_ctx", 32000),
num_predict=kwargs.get("num_predict", 1024),
base_url=base_url,
)
elif provider == "azure_openai":
if not kwargs.get("base_url", ""):
base_url = os.getenv("AZURE_OPENAI_ENDPOINT", "")
else:
base_url = kwargs.get("base_url")
api_version = kwargs.get("api_version", "") or os.getenv("AZURE_OPENAI_API_VERSION", "2025-01-01-preview")
return AzureChatOpenAI(
model=kwargs.get("model_name", "gpt-4o"),
temperature=kwargs.get("temperature", 0.0),
api_version=api_version,
azure_endpoint=base_url,
api_key=api_key,
)
elif provider == "alibaba":
if not kwargs.get("base_url", ""):
base_url = os.getenv("ALIBABA_ENDPOINT", "https://dashscope.aliyuncs.com/compatible-mode/v1")
else:
base_url = kwargs.get("base_url")
return ChatOpenAI(
model=kwargs.get("model_name", "qwen-plus"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "ibm":
parameters = {
"temperature": kwargs.get("temperature", 0.0),
"max_tokens": kwargs.get("num_ctx", 32000)
}
if not kwargs.get("base_url", ""):
base_url = os.getenv("IBM_ENDPOINT", "https://us-south.ml.cloud.ibm.com")
else:
base_url = kwargs.get("base_url")
return ChatWatsonx(
model_id=kwargs.get("model_name", "ibm/granite-vision-3.1-2b-preview"),
url=base_url,
project_id=os.getenv("IBM_PROJECT_ID"),
apikey=os.getenv("IBM_API_KEY"),
params=parameters
)
elif provider == "moonshot":
return ChatOpenAI(
model=kwargs.get("model_name", "moonshot-v1-32k-vision-preview"),
temperature=kwargs.get("temperature", 0.0),
base_url=os.getenv("MOONSHOT_ENDPOINT"),
api_key=os.getenv("MOONSHOT_API_KEY"),
)
elif provider == "unbound":
return ChatOpenAI(
model=kwargs.get("model_name", "gpt-4o-mini"),
temperature=kwargs.get("temperature", 0.0),
base_url=os.getenv("UNBOUND_ENDPOINT", "https://api.getunbound.ai"),
api_key=api_key,
)
elif provider == "siliconflow":
if not kwargs.get("api_key", ""):
api_key = os.getenv("SiliconFLOW_API_KEY", "")
else:
api_key = kwargs.get("api_key")
if not kwargs.get("base_url", ""):
base_url = os.getenv("SiliconFLOW_ENDPOINT", "")
else:
base_url = kwargs.get("base_url")
return ChatOpenAI(
api_key=api_key,
base_url=base_url,
model_name=kwargs.get("model_name", "Qwen/QwQ-32B"),
temperature=kwargs.get("temperature", 0.0),
)
else:
raise ValueError(f"Unsupported provider: {provider}")
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/config.py:
--------------------------------------------------------------------------------
```python
from typing import Any, Dict, List, Optional, Union
from pydantic import Field, SecretStr, field_validator, ValidationInfo
from pydantic_settings import BaseSettings, SettingsConfigDict
class LLMSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="MCP_LLM_")
provider: str = Field(default="google", env="PROVIDER")
model_name: str = Field(default="gemini-2.5-flash-preview-04-17", env="MODEL_NAME")
temperature: float = Field(default=0.0, env="TEMPERATURE")
base_url: Optional[str] = Field(default=None, env="BASE_URL")
api_key: Optional[SecretStr] = Field(default=None, env="API_KEY") # Generic API key
# Provider-specific API keys
openai_api_key: Optional[SecretStr] = Field(default=None, env="OPENAI_API_KEY")
anthropic_api_key: Optional[SecretStr] = Field(default=None, env="ANTHROPIC_API_KEY")
google_api_key: Optional[SecretStr] = Field(default=None, env="GOOGLE_API_KEY")
azure_openai_api_key: Optional[SecretStr] = Field(default=None, env="AZURE_OPENAI_API_KEY")
deepseek_api_key: Optional[SecretStr] = Field(default=None, env="DEEPSEEK_API_KEY")
mistral_api_key: Optional[SecretStr] = Field(default=None, env="MISTRAL_API_KEY")
openrouter_api_key: Optional[SecretStr] = Field(default=None, env="OPENROUTER_API_KEY")
alibaba_api_key: Optional[SecretStr] = Field(default=None, env="ALIBABA_API_KEY")
moonshot_api_key: Optional[SecretStr] = Field(default=None, env="MOONSHOT_API_KEY")
unbound_api_key: Optional[SecretStr] = Field(default=None, env="UNBOUND_API_KEY")
# Provider-specific endpoints
openai_endpoint: Optional[str] = Field(default=None, env="OPENAI_ENDPOINT")
anthropic_endpoint: Optional[str] = Field(default=None, env="ANTHROPIC_ENDPOINT")
azure_openai_endpoint: Optional[str] = Field(default=None, env="AZURE_OPENAI_ENDPOINT")
azure_openai_api_version: str = Field(default="2025-01-01-preview", env="AZURE_OPENAI_API_VERSION")
deepseek_endpoint: Optional[str] = Field(default=None, env="DEEPSEEK_ENDPOINT")
mistral_endpoint: Optional[str] = Field(default=None, env="MISTRAL_ENDPOINT")
ollama_endpoint: str = Field(default="http://localhost:11434", env="OLLAMA_ENDPOINT")
openrouter_endpoint: str = Field(default="https://openrouter.ai/api/v1", env="OPENROUTER_ENDPOINT")
alibaba_endpoint: Optional[str] = Field(default=None, env="ALIBABA_ENDPOINT")
moonshot_endpoint: Optional[str] = Field(default=None, env="MOONSHOT_ENDPOINT")
unbound_endpoint: Optional[str] = Field(default=None, env="UNBOUND_ENDPOINT")
ollama_num_ctx: Optional[int] = Field(default=32000, env="OLLAMA_NUM_CTX")
ollama_num_predict: Optional[int] = Field(default=1024, env="OLLAMA_NUM_PREDICT")
# Planner LLM settings (optional, defaults to main LLM if not set)
planner_provider: Optional[str] = Field(default=None, env="PLANNER_PROVIDER")
planner_model_name: Optional[str] = Field(default=None, env="PLANNER_MODEL_NAME")
planner_temperature: Optional[float] = Field(default=None, env="PLANNER_TEMPERATURE")
planner_base_url: Optional[str] = Field(default=None, env="PLANNER_BASE_URL")
planner_api_key: Optional[SecretStr] = Field(default=None, env="PLANNER_API_KEY")
class BrowserSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="MCP_BROWSER_")
headless: bool = Field(default=False, env="HEADLESS") # General headless
disable_security: bool = Field(default=False, env="DISABLE_SECURITY") # General disable security
binary_path: Optional[str] = Field(default=None, env="BINARY_PATH")
user_data_dir: Optional[str] = Field(default=None, env="USER_DATA_DIR")
window_width: int = Field(default=1280, env="WINDOW_WIDTH")
window_height: int = Field(default=1080, env="WINDOW_HEIGHT")
use_own_browser: bool = Field(default=False, env="USE_OWN_BROWSER")
cdp_url: Optional[str] = Field(default=None, env="CDP_URL")
wss_url: Optional[str] = Field(default=None, env="WSS_URL") # For CDP connection if needed
keep_open: bool = Field(default=False, env="KEEP_OPEN") # Server-managed browser persistence
trace_path: Optional[str] = Field(default=None, env="TRACE_PATH")
class AgentToolSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="MCP_AGENT_TOOL_")
max_steps: int = Field(default=100, env="MAX_STEPS")
max_actions_per_step: int = Field(default=5, env="MAX_ACTIONS_PER_STEP")
tool_calling_method: Optional[str] = Field(default="auto", env="TOOL_CALLING_METHOD")
max_input_tokens: Optional[int] = Field(default=128000, env="MAX_INPUT_TOKENS")
use_vision: bool = Field(default=True, env="USE_VISION")
# Browser settings specific to this tool, can override general MCP_BROWSER_ settings
headless: Optional[bool] = Field(default=None, env="HEADLESS")
disable_security: Optional[bool] = Field(default=None, env="DISABLE_SECURITY")
enable_recording: bool = Field(default=False, env="ENABLE_RECORDING")
save_recording_path: Optional[str] = Field(default=None, env="SAVE_RECORDING_PATH") # e.g. ./tmp/recordings
history_path: Optional[str] = Field(default=None, env="HISTORY_PATH") # e.g. ./tmp/agent_history
class DeepResearchToolSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="MCP_RESEARCH_TOOL_")
max_parallel_browsers: int = Field(default=3, env="MAX_PARALLEL_BROWSERS")
save_dir: Optional[str] = Field(default=None, env="SAVE_DIR") # Base dir, task_id will be appended. Optional now.
class PathSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="MCP_PATHS_")
downloads: Optional[str] = Field(default=None, env="DOWNLOADS") # e.g. ./tmp/downloads
class ServerSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="MCP_SERVER_")
log_file: Optional[str] = Field(default=None, env="LOG_FILE")
logging_level: str = Field(default="ERROR", env="LOGGING_LEVEL")
anonymized_telemetry: bool = Field(default=True, env="ANONYMIZED_TELEMETRY")
mcp_config: Optional[Dict[str, Any]] = Field(default=None, env="MCP_CONFIG") # For controller's MCP client
class AppSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="MCP_", extra='ignore') # Root prefix
llm: LLMSettings = Field(default_factory=LLMSettings)
browser: BrowserSettings = Field(default_factory=BrowserSettings)
agent_tool: AgentToolSettings = Field(default_factory=AgentToolSettings)
research_tool: DeepResearchToolSettings = Field(default_factory=DeepResearchToolSettings)
paths: PathSettings = Field(default_factory=PathSettings)
server: ServerSettings = Field(default_factory=ServerSettings)
@field_validator('server', 'llm', 'browser', 'agent_tool', 'research_tool', 'paths', mode='before')
@classmethod
def ensure_nested_defaults(cls, v: Any) -> Any:
# This ensures that even if MCP_SERVER__LOG_FILE is set but MCP_SERVER is not,
# the ServerSettings object is still created.
# Pydantic-settings usually handles this, but being explicit can help.
if v is None:
return {}
return v
def get_api_key_for_provider(self, provider_name: Optional[str], is_planner: bool = False) -> Optional[str]:
"""Retrieves the API key for a given provider, checking generic, then specific."""
llm_settings_to_use = self.llm
provider_to_use = provider_name if provider_name else (self.llm.planner_provider if is_planner else self.llm.provider)
if is_planner:
if self.llm.planner_api_key:
return self.llm.planner_api_key.get_secret_value()
# Fallback to main LLM settings if planner-specific key is not set, but provider is
if self.llm.planner_provider and not self.llm.planner_api_key:
llm_settings_to_use = self.llm # Check main llm settings for this provider
# if no planner provider, it will use main llm provider and its key
if not provider_to_use: # Should not happen if called correctly
return None
# Check generic API key first for the relevant LLM settings (main or planner if planner_api_key was set)
if not is_planner and llm_settings_to_use.api_key: # only main LLM has generic api_key
return llm_settings_to_use.api_key.get_secret_value()
provider_specific_key_name = f"{provider_to_use.lower()}_api_key"
if hasattr(llm_settings_to_use, provider_specific_key_name):
key_val = getattr(llm_settings_to_use, provider_specific_key_name)
if key_val and isinstance(key_val, SecretStr):
return key_val.get_secret_value()
return None
def get_endpoint_for_provider(self, provider_name: Optional[str], is_planner: bool = False) -> Optional[str]:
"""Retrieves the endpoint for a given provider."""
llm_settings_to_use = self.llm
provider_to_use = provider_name if provider_name else (self.llm.planner_provider if is_planner else self.llm.provider)
if is_planner:
if self.llm.planner_base_url:
return self.llm.planner_base_url
if self.llm.planner_provider and not self.llm.planner_base_url:
llm_settings_to_use = self.llm # Check main llm settings for this provider
if not provider_to_use:
return None
if not is_planner and llm_settings_to_use.base_url: # only main LLM has generic base_url
return llm_settings_to_use.base_url
provider_specific_endpoint_name = f"{provider_to_use.lower()}_endpoint"
if hasattr(llm_settings_to_use, provider_specific_endpoint_name):
return getattr(llm_settings_to_use, provider_specific_endpoint_name)
return None
def get_llm_config(self, is_planner: bool = False) -> Dict[str, Any]:
"""Returns a dictionary of LLM settings suitable for llm_provider.get_llm_model."""
provider = self.llm.planner_provider if is_planner and self.llm.planner_provider else self.llm.provider
model_name = self.llm.planner_model_name if is_planner and self.llm.planner_model_name else self.llm.model_name
temperature = self.llm.planner_temperature if is_planner and self.llm.planner_temperature is not None else self.llm.temperature
api_key = self.get_api_key_for_provider(provider, is_planner=is_planner)
base_url = self.get_endpoint_for_provider(provider, is_planner=is_planner)
config = {
"provider": provider,
"model_name": model_name,
"temperature": temperature,
"api_key": api_key,
"base_url": base_url,
"use_vision": self.agent_tool.use_vision if not is_planner else False, # Planners typically don't need vision
"tool_calling_method": self.agent_tool.tool_calling_method if not is_planner else "auto",
"max_input_tokens": self.agent_tool.max_input_tokens if not is_planner else None,
}
if provider == "azure_openai":
config["azure_openai_api_version"] = self.llm.azure_openai_api_version
elif provider == "ollama":
config["ollama_num_ctx"] = self.llm.ollama_num_ctx
config["ollama_num_predict"] = self.llm.ollama_num_predict
elif provider == "openrouter":
config["provider"] = "openai"
return config
# Global settings instance, to be imported by other modules
settings = AppSettings()
# Example usage (for testing this file directly):
if __name__ == "__main__":
try:
print("Loaded AppSettings:")
print(settings.model_dump_json(indent=2))
print(f"\nLLM API Key for main provider ({settings.llm.provider}): {settings.get_api_key_for_provider(settings.llm.provider)}")
if settings.llm.planner_provider:
print(f"LLM API Key for planner provider ({settings.llm.planner_provider}): {settings.get_api_key_for_provider(settings.llm.planner_provider, is_planner=True)}")
print("\nMain LLM Config for get_llm_model:")
print(settings.get_llm_config())
if settings.llm.planner_provider:
print("\nPlanner LLM Config for get_llm_model:")
print(settings.get_llm_config(is_planner=True))
except Exception as e:
print(f"Error during settings load or test: {e}")
import os
print("MCP_RESEARCH_TOOL_SAVE_DIR:", os.getenv("MCP_RESEARCH_TOOL_SAVE_DIR"))
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/cli.py:
--------------------------------------------------------------------------------
```python
import asyncio
import json
import logging
import os
import sys
import traceback
import uuid
from pathlib import Path
from typing import Any, Dict, Optional
import typer
from dotenv import load_dotenv
from .config import AppSettings, settings as global_settings # Import AppSettings and the global instance
# Import from _internal
from ._internal.agent.browser_use.browser_use_agent import BrowserUseAgent, AgentHistoryList
from ._internal.agent.deep_research.deep_research_agent import DeepResearchAgent
from ._internal.browser.custom_browser import CustomBrowser
from ._internal.browser.custom_context import (
CustomBrowserContext,
CustomBrowserContextConfig,
)
from ._internal.controller.custom_controller import CustomController
from ._internal.utils import llm_provider as internal_llm_provider
from browser_use.browser.browser import BrowserConfig
from browser_use.agent.views import AgentOutput
from browser_use.browser.views import BrowserState
app = typer.Typer(name="mcp-browser-cli", help="CLI for mcp-browser-use tools.")
logger = logging.getLogger("mcp_browser_cli")
class CLIState:
settings: Optional[AppSettings] = None
cli_state = CLIState()
def setup_logging(level_str: str, log_file: Optional[str]):
numeric_level = getattr(logging, level_str.upper(), logging.INFO)
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(
level=numeric_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
filename=log_file if log_file else None,
filemode="a" if log_file else None,
force=True
)
@app.callback()
def main_callback(
ctx: typer.Context,
env_file: Optional[Path] = typer.Option(
None, "--env-file", "-e", help="Path to .env file to load.", exists=True, dir_okay=False, resolve_path=True
),
log_level: Optional[str] = typer.Option(
None, "--log-level", "-l", help="Override logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)."
)
):
"""
MCP Browser Use CLI. Settings are loaded from environment variables.
You can use an .env file for convenience.
"""
if env_file:
load_dotenv(env_file, override=True)
logger.info(f"Loaded environment variables from: {env_file}")
# Reload settings after .env might have been loaded and to apply overrides
try:
cli_state.settings = AppSettings()
except Exception as e:
# This can happen if mandatory fields (like MCP_RESEARCH_TOOL_SAVE_DIR) are not set
sys.stderr.write(f"Error loading application settings: {e}\n")
sys.stderr.write("Please ensure all mandatory environment variables are set (e.g., MCP_RESEARCH_TOOL_SAVE_DIR).\n")
raise typer.Exit(code=1)
# Setup logging based on final settings (env file, then env vars, then CLI override)
final_log_level = log_level if log_level else cli_state.settings.server.logging_level
final_log_file = cli_state.settings.server.log_file
setup_logging(final_log_level, final_log_file)
logger.info(f"CLI initialized. Effective log level: {final_log_level.upper()}")
if not cli_state.settings: # Should not happen if AppSettings() worked
logger.error("Failed to load application settings.")
raise typer.Exit(code=1)
async def cli_ask_human_callback(query: str, browser_context: Any) -> Dict[str, Any]:
"""Callback for agent to ask human for input via CLI."""
# browser_context is part of the signature from browser-use, might not be needed here
print(typer.style(f"\n🤖 AGENT ASKS: {query}", fg=typer.colors.YELLOW))
response_text = typer.prompt(typer.style("Your response", fg=typer.colors.CYAN))
return {"response": response_text}
def cli_on_step_callback(browser_state: BrowserState, agent_output: AgentOutput, step_num: int):
"""CLI callback for BrowserUseAgent steps."""
print(typer.style(f"\n--- Step {step_num} ---", fg=typer.colors.BLUE, bold=True))
# Print current state if available
if hasattr(agent_output, "current_state") and agent_output.current_state:
print(typer.style("🧠 Agent State:", fg=typer.colors.MAGENTA))
print(agent_output.current_state)
# Print actions
if hasattr(agent_output, "action") and agent_output.action:
print(typer.style("🎬 Actions:", fg=typer.colors.GREEN))
for action in agent_output.action:
# Try to get action_type and action_input if present, else print the action itself
action_type = getattr(action, "action_type", None)
action_input = getattr(action, "action_input", None)
if action_type is not None or action_input is not None:
print(f" - {action_type or 'Unknown action'}: {action_input or ''}")
else:
print(f" - {action}")
# Optionally print observation if present in browser_state
if hasattr(browser_state, "observation") and browser_state.observation:
obs = browser_state.observation
print(typer.style("👀 Observation:", fg=typer.colors.CYAN))
print(str(obs)[:200] + "..." if obs and len(str(obs)) > 200 else obs)
async def _run_browser_agent_logic_cli(task_str: str, current_settings: AppSettings) -> str:
logger.info(f"CLI: Starting run_browser_agent task: {task_str[:100]}...")
agent_task_id = str(uuid.uuid4())
final_result = "Error: Agent execution failed."
browser_instance: Optional[CustomBrowser] = None
context_instance: Optional[CustomBrowserContext] = None
controller_instance: Optional[CustomController] = None
try:
# LLM Setup
main_llm_config = current_settings.get_llm_config()
main_llm = internal_llm_provider.get_llm_model(**main_llm_config)
planner_llm = None
if current_settings.llm.planner_provider and current_settings.llm.planner_model_name:
planner_llm_config = current_settings.get_llm_config(is_planner=True)
planner_llm = internal_llm_provider.get_llm_model(**planner_llm_config)
# Controller Setup
controller_instance = CustomController(ask_assistant_callback=cli_ask_human_callback)
if current_settings.server.mcp_config:
mcp_dict_config = current_settings.server.mcp_config
if isinstance(current_settings.server.mcp_config, str):
mcp_dict_config = json.loads(current_settings.server.mcp_config)
await controller_instance.setup_mcp_client(mcp_dict_config)
# Browser and Context Setup
agent_headless_override = current_settings.agent_tool.headless
browser_headless = agent_headless_override if agent_headless_override is not None else current_settings.browser.headless
agent_disable_security_override = current_settings.agent_tool.disable_security
browser_disable_security = agent_disable_security_override if agent_disable_security_override is not None else current_settings.browser.disable_security
if current_settings.browser.use_own_browser and current_settings.browser.cdp_url:
browser_cfg = BrowserConfig(cdp_url=current_settings.browser.cdp_url, wss_url=current_settings.browser.wss_url, user_data_dir=current_settings.browser.user_data_dir)
else:
browser_cfg = BrowserConfig(
headless=browser_headless,
disable_security=browser_disable_security,
browser_binary_path=current_settings.browser.binary_path,
user_data_dir=current_settings.browser.user_data_dir,
window_width=current_settings.browser.window_width,
window_height=current_settings.browser.window_height,
)
browser_instance = CustomBrowser(config=browser_cfg)
context_cfg = CustomBrowserContextConfig(
trace_path=current_settings.browser.trace_path,
save_downloads_path=current_settings.paths.downloads,
save_recording_path=current_settings.agent_tool.save_recording_path if current_settings.agent_tool.enable_recording else None,
force_new_context=True # CLI always gets a new context
)
context_instance = await browser_instance.new_context(config=context_cfg)
agent_history_json_file = None
task_history_base_path = current_settings.agent_tool.history_path
if task_history_base_path:
task_specific_history_dir = Path(task_history_base_path) / agent_task_id
task_specific_history_dir.mkdir(parents=True, exist_ok=True)
agent_history_json_file = str(task_specific_history_dir / f"{agent_task_id}.json")
logger.info(f"Agent history will be saved to: {agent_history_json_file}")
# Agent Instantiation
agent_instance = BrowserUseAgent(
task=task_str, llm=main_llm,
browser=browser_instance, browser_context=context_instance, controller=controller_instance,
planner_llm=planner_llm,
max_actions_per_step=current_settings.agent_tool.max_actions_per_step,
use_vision=current_settings.agent_tool.use_vision,
register_new_step_callback=cli_on_step_callback,
)
# Run Agent
history: AgentHistoryList = await agent_instance.run(max_steps=current_settings.agent_tool.max_steps)
agent_instance.save_history(agent_history_json_file)
final_result = history.final_result() or "Agent finished without a final result."
logger.info(f"CLI Agent task {agent_task_id} completed.")
except Exception as e:
logger.error(f"CLI Error in run_browser_agent: {e}\n{traceback.format_exc()}")
final_result = f"Error: {e}"
finally:
if context_instance: await context_instance.close()
if browser_instance and not current_settings.browser.use_own_browser : await browser_instance.close() # Only close if we launched it
if controller_instance: await controller_instance.close_mcp_client()
return final_result
async def _run_deep_research_logic_cli(research_task_str: str, max_parallel_browsers_override: Optional[int], current_settings: AppSettings) -> str:
logger.info(f"CLI: Starting run_deep_research task: {research_task_str[:100]}...")
task_id = str(uuid.uuid4())
report_content = "Error: Deep research failed."
try:
main_llm_config = current_settings.get_llm_config()
research_llm = internal_llm_provider.get_llm_model(**main_llm_config)
dr_browser_cfg = {
"headless": current_settings.browser.headless,
"disable_security": current_settings.browser.disable_security,
"browser_binary_path": current_settings.browser.binary_path,
"user_data_dir": current_settings.browser.user_data_dir,
"window_width": current_settings.browser.window_width,
"window_height": current_settings.browser.window_height,
"trace_path": current_settings.browser.trace_path,
"save_downloads_path": current_settings.paths.downloads,
}
if current_settings.browser.use_own_browser and current_settings.browser.cdp_url:
dr_browser_cfg["cdp_url"] = current_settings.browser.cdp_url
dr_browser_cfg["wss_url"] = current_settings.browser.wss_url
mcp_server_config_for_agent = None
if current_settings.server.mcp_config:
mcp_server_config_for_agent = current_settings.server.mcp_config
if isinstance(current_settings.server.mcp_config, str):
mcp_server_config_for_agent = json.loads(current_settings.server.mcp_config)
agent_instance = DeepResearchAgent(
llm=research_llm, browser_config=dr_browser_cfg,
mcp_server_config=mcp_server_config_for_agent,
)
current_max_parallel_browsers = max_parallel_browsers_override if max_parallel_browsers_override is not None else current_settings.research_tool.max_parallel_browsers
save_dir_for_task = os.path.join(current_settings.research_tool.save_dir, task_id)
os.makedirs(save_dir_for_task, exist_ok=True)
logger.info(f"CLI Deep research save directory: {save_dir_for_task}")
logger.info(f"CLI Using max_parallel_browsers: {current_max_parallel_browsers}")
result_dict = await agent_instance.run(
topic=research_task_str, task_id=task_id,
save_dir=save_dir_for_task, max_parallel_browsers=current_max_parallel_browsers
)
report_file_path = result_dict.get("report_file_path")
if report_file_path and os.path.exists(report_file_path):
with open(report_file_path, "r", encoding="utf-8") as f:
markdown_content = f.read()
report_content = f"Deep research report generated successfully at {report_file_path}\n\n{markdown_content}"
logger.info(f"CLI Deep research task {task_id} completed. Report at {report_file_path}")
else:
report_content = f"Deep research completed, but report file not found. Result: {result_dict}"
logger.warning(f"CLI Deep research task {task_id} result: {result_dict}, report file path missing or invalid.")
except Exception as e:
logger.error(f"CLI Error in run_deep_research: {e}\n{traceback.format_exc()}")
report_content = f"Error: {e}"
return report_content
@app.command()
def run_browser_agent(
task: str = typer.Argument(..., help="The primary task or objective for the browser agent."),
):
"""Runs a browser agent task and prints the result."""
if not cli_state.settings:
typer.secho("Error: Application settings not loaded. Use --env-file or set environment variables.", fg=typer.colors.RED)
raise typer.Exit(code=1)
typer.secho(f"Executing browser agent task: {task}", fg=typer.colors.GREEN)
try:
result = asyncio.run(_run_browser_agent_logic_cli(task, cli_state.settings))
typer.secho("\n--- Agent Final Result ---", fg=typer.colors.BLUE, bold=True)
print(result)
except Exception as e:
typer.secho(f"CLI command failed: {e}", fg=typer.colors.RED)
logger.error(f"CLI run_browser_agent command failed: {e}\n{traceback.format_exc()}")
raise typer.Exit(code=1)
@app.command()
def run_deep_research(
research_task: str = typer.Argument(..., help="The topic or question for deep research."),
max_parallel_browsers: Optional[int] = typer.Option(None, "--max-parallel-browsers", "-p", help="Override max parallel browsers from settings.")
):
"""Performs deep web research and prints the report."""
if not cli_state.settings:
typer.secho("Error: Application settings not loaded. Use --env-file or set environment variables.", fg=typer.colors.RED)
raise typer.Exit(code=1)
typer.secho(f"Executing deep research task: {research_task}", fg=typer.colors.GREEN)
try:
result = asyncio.run(_run_deep_research_logic_cli(research_task, max_parallel_browsers, cli_state.settings))
typer.secho("\n--- Deep Research Final Report ---", fg=typer.colors.BLUE, bold=True)
print(result)
except Exception as e:
typer.secho(f"CLI command failed: {e}", fg=typer.colors.RED)
logger.error(f"CLI run_deep_research command failed: {e}\n{traceback.format_exc()}")
raise typer.Exit(code=1)
if __name__ == "__main__":
# This allows running `python src/mcp_server_browser_use/cli.py ...`
# Set a default log level if run directly for dev purposes, can be overridden by CLI args
if not os.getenv("MCP_SERVER_LOGGING_LEVEL"): # Check if already set
os.environ["MCP_SERVER_LOGGING_LEVEL"] = "DEBUG"
if not os.getenv("MCP_RESEARCH_TOOL_SAVE_DIR"): # Ensure mandatory var is set for local dev
print("Warning: MCP_RESEARCH_TOOL_SAVE_DIR not set. Defaulting to './tmp/deep_research_cli_default' for this run.", file=sys.stderr)
os.environ["MCP_RESEARCH_TOOL_SAVE_DIR"] = "./tmp/deep_research_cli_default"
app()
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/server.py:
--------------------------------------------------------------------------------
```python
import asyncio
import json
import logging
import os
import traceback
import uuid
from typing import Any, Dict, Optional
from pathlib import Path
from .config import settings # Import global AppSettings instance
# Configure logging using settings
log_level_str = settings.server.logging_level.upper()
numeric_level = getattr(logging, log_level_str, logging.INFO)
# Remove any existing handlers from the root logger to avoid duplicate messages
# if basicConfig was called elsewhere or by a library.
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(
level=numeric_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
filename=settings.server.log_file if settings.server.log_file else None,
filemode="a" if settings.server.log_file else None, # only use filemode if filename is set
force=True # Override any previous basicConfig
)
logger = logging.getLogger("mcp_server_browser_use")
# Prevent log propagation if other loggers are configured higher up
# logging.getLogger().propagate = False # This might be too aggressive, let's rely on basicConfig force
from browser_use.browser.browser import BrowserConfig
from mcp.server.fastmcp import Context, FastMCP
# Import from _internal
from ._internal.agent.browser_use.browser_use_agent import BrowserUseAgent
from ._internal.agent.deep_research.deep_research_agent import DeepResearchAgent
from ._internal.browser.custom_browser import CustomBrowser
from ._internal.browser.custom_context import (
CustomBrowserContext,
CustomBrowserContextConfig,
)
from ._internal.controller.custom_controller import CustomController
from ._internal.utils import llm_provider as internal_llm_provider # aliased
from browser_use.agent.views import (
AgentHistoryList,
)
# Shared resources for MCP_BROWSER_KEEP_OPEN
shared_browser_instance: Optional[CustomBrowser] = None
shared_context_instance: Optional[CustomBrowserContext] = None
shared_controller_instance: Optional[CustomController] = None # Controller might also be shared
resource_lock = asyncio.Lock()
async def get_controller(ask_human_callback: Optional[Any] = None) -> CustomController:
"""Gets or creates a shared controller instance if keep_open is true, or a new one."""
global shared_controller_instance
if settings.browser.keep_open and shared_controller_instance:
# Potentially update callback if it can change per call, though usually fixed for server
return shared_controller_instance
controller = CustomController(ask_assistant_callback=ask_human_callback)
if settings.server.mcp_config:
try:
mcp_dict_config = settings.server.mcp_config
if isinstance(settings.server.mcp_config, str): # if passed as JSON string
mcp_dict_config = json.loads(settings.server.mcp_config)
await controller.setup_mcp_client(mcp_dict_config)
except Exception as e:
logger.error(f"Failed to setup MCP client for controller: {e}")
if settings.browser.keep_open:
shared_controller_instance = controller
return controller
async def get_browser_and_context() -> tuple[CustomBrowser, CustomBrowserContext]:
"""
Manages creation/reuse of CustomBrowser and CustomBrowserContext
based on settings.browser.keep_open and settings.browser.use_own_browser.
"""
global shared_browser_instance, shared_context_instance
current_browser: Optional[CustomBrowser] = None
current_context: Optional[CustomBrowserContext] = None
agent_headless_override = settings.agent_tool.headless
browser_headless = agent_headless_override if agent_headless_override is not None else settings.browser.headless
agent_disable_security_override = settings.agent_tool.disable_security
browser_disable_security = agent_disable_security_override if agent_disable_security_override is not None else settings.browser.disable_security
if settings.browser.use_own_browser and settings.browser.cdp_url:
logger.info(f"Connecting to own browser via CDP: {settings.browser.cdp_url}")
browser_cfg = BrowserConfig(
cdp_url=settings.browser.cdp_url,
wss_url=settings.browser.wss_url,
user_data_dir=settings.browser.user_data_dir, # Useful for CDP
# Headless, binary_path etc. are controlled by the user-launched browser
)
current_browser = CustomBrowser(config=browser_cfg)
# For CDP, context config is minimal, trace/recording might not apply or be harder to manage
context_cfg = CustomBrowserContextConfig(
trace_path=settings.browser.trace_path,
save_downloads_path=settings.paths.downloads,
save_recording_path=settings.agent_tool.save_recording_path if settings.agent_tool.enable_recording else None,
)
current_context = await current_browser.new_context(config=context_cfg)
elif settings.browser.keep_open:
if shared_browser_instance and shared_context_instance:
logger.info("Reusing shared browser and context.")
# Ensure browser is still connected
if not shared_browser_instance.is_connected():
logger.warning("Shared browser was disconnected. Recreating.")
if shared_context_instance: await shared_context_instance.close() # Close old context too
await shared_browser_instance.close() # Close browser after context
shared_browser_instance = None
shared_context_instance = None
else:
current_browser = shared_browser_instance
# For shared browser, we might want a new context or reuse.
# For simplicity, let's reuse the context if keep_open is true.
# If new context per call is needed, this logic would change.
current_context = shared_context_instance
if not current_browser or not current_context : # If shared instances were not valid or not yet created
logger.info("Creating new shared browser and context.")
browser_cfg = BrowserConfig(
headless=browser_headless,
disable_security=browser_disable_security,
browser_binary_path=settings.browser.binary_path,
user_data_dir=settings.browser.user_data_dir,
window_width=settings.browser.window_width,
window_height=settings.browser.window_height,
)
shared_browser_instance = CustomBrowser(config=browser_cfg)
context_cfg = CustomBrowserContextConfig(
trace_path=settings.browser.trace_path,
save_downloads_path=settings.paths.downloads,
save_recording_path=settings.agent_tool.save_recording_path if settings.agent_tool.enable_recording else None,
force_new_context=False # Important for shared context
)
shared_context_instance = await shared_browser_instance.new_context(config=context_cfg)
current_browser = shared_browser_instance
current_context = shared_context_instance
else: # Create new resources per call (not using own browser, not keeping open)
logger.info("Creating new browser and context for this call.")
browser_cfg = BrowserConfig(
headless=browser_headless,
disable_security=browser_disable_security,
browser_binary_path=settings.browser.binary_path,
user_data_dir=settings.browser.user_data_dir,
window_width=settings.browser.window_width,
window_height=settings.browser.window_height,
)
current_browser = CustomBrowser(config=browser_cfg)
context_cfg = CustomBrowserContextConfig(
trace_path=settings.browser.trace_path,
save_downloads_path=settings.paths.downloads,
save_recording_path=settings.agent_tool.save_recording_path if settings.agent_tool.enable_recording else None,
force_new_context=True
)
current_context = await current_browser.new_context(config=context_cfg)
if not current_browser or not current_context:
raise RuntimeError("Failed to initialize browser or context")
return current_browser, current_context
def serve() -> FastMCP:
server = FastMCP("mcp_server_browser_use")
@server.tool()
async def run_browser_agent(ctx: Context, task: str) -> str:
logger.info(f"Received run_browser_agent task: {task[:100]}...")
agent_task_id = str(uuid.uuid4())
final_result = "Error: Agent execution failed."
browser_instance: Optional[CustomBrowser] = None
context_instance: Optional[CustomBrowserContext] = None
controller_instance: Optional[CustomController] = None
try:
async with resource_lock: # Protect shared resource access/creation
browser_instance, context_instance = await get_browser_and_context()
# For server, ask_human_callback is likely not interactive, can be None or a placeholder
controller_instance = await get_controller(ask_human_callback=None)
if not browser_instance or not context_instance or not controller_instance:
raise RuntimeError("Failed to acquire browser resources or controller.")
main_llm_config = settings.get_llm_config()
main_llm = internal_llm_provider.get_llm_model(**main_llm_config)
planner_llm = None
if settings.llm.planner_provider and settings.llm.planner_model_name:
planner_llm_config = settings.get_llm_config(is_planner=True)
planner_llm = internal_llm_provider.get_llm_model(**planner_llm_config)
agent_history_json_file = None
task_history_base_path = settings.agent_tool.history_path
if task_history_base_path:
task_specific_history_dir = Path(task_history_base_path) / agent_task_id
task_specific_history_dir.mkdir(parents=True, exist_ok=True)
agent_history_json_file = str(task_specific_history_dir / f"{agent_task_id}.json")
logger.info(f"Agent history will be saved to: {agent_history_json_file}")
agent_instance = BrowserUseAgent(
task=task,
llm=main_llm,
browser=browser_instance,
browser_context=context_instance,
controller=controller_instance,
planner_llm=planner_llm,
max_actions_per_step=settings.agent_tool.max_actions_per_step,
use_vision=settings.agent_tool.use_vision,
)
history: AgentHistoryList = await agent_instance.run(max_steps=settings.agent_tool.max_steps)
if agent_history_json_file:
agent_instance.save_history(agent_history_json_file)
final_result = history.final_result() or "Agent finished without a final result."
logger.info(f"Agent task completed. Result: {final_result[:100]}...")
except Exception as e:
logger.error(f"Error in run_browser_agent: {e}\n{traceback.format_exc()}")
final_result = f"Error: {e}"
finally:
if not settings.browser.keep_open and not settings.browser.use_own_browser:
logger.info("Closing browser resources for this call.")
if context_instance:
await context_instance.close()
if browser_instance:
await browser_instance.close()
if controller_instance: # Close controller only if not shared
await controller_instance.close_mcp_client()
elif settings.browser.use_own_browser: # Own browser, only close controller if not shared
if controller_instance and not (settings.browser.keep_open and controller_instance == shared_controller_instance):
await controller_instance.close_mcp_client()
return final_result
@server.tool()
async def run_deep_research(
ctx: Context,
research_task: str,
max_parallel_browsers_override: Optional[int] = None,
) -> str:
logger.info(f"Received run_deep_research task: {research_task[:100]}...")
task_id = str(uuid.uuid4()) # This task_id is used for the sub-directory name
report_content = "Error: Deep research failed."
try:
main_llm_config = settings.get_llm_config() # Deep research uses main LLM config
research_llm = internal_llm_provider.get_llm_model(**main_llm_config)
# Prepare browser_config dict for DeepResearchAgent's sub-agents
dr_browser_cfg = {
"headless": settings.browser.headless, # Use general browser headless for sub-tasks
"disable_security": settings.browser.disable_security,
"browser_binary_path": settings.browser.binary_path,
"user_data_dir": settings.browser.user_data_dir,
"window_width": settings.browser.window_width,
"window_height": settings.browser.window_height,
"trace_path": settings.browser.trace_path, # For sub-agent traces
"save_downloads_path": settings.paths.downloads, # For sub-agent downloads
}
if settings.browser.use_own_browser and settings.browser.cdp_url:
# If main browser is CDP, sub-agents should also use it
dr_browser_cfg["cdp_url"] = settings.browser.cdp_url
dr_browser_cfg["wss_url"] = settings.browser.wss_url
mcp_server_config_for_agent = None
if settings.server.mcp_config:
mcp_server_config_for_agent = settings.server.mcp_config
if isinstance(settings.server.mcp_config, str):
mcp_server_config_for_agent = json.loads(settings.server.mcp_config)
agent_instance = DeepResearchAgent(
llm=research_llm,
browser_config=dr_browser_cfg,
mcp_server_config=mcp_server_config_for_agent,
)
current_max_parallel_browsers = max_parallel_browsers_override if max_parallel_browsers_override is not None else settings.research_tool.max_parallel_browsers
# Check if save_dir is provided, otherwise use in-memory approach
save_dir_for_this_task = None
if settings.research_tool.save_dir:
# If save_dir is provided, construct the full save directory path for this specific task
save_dir_for_this_task = str(Path(settings.research_tool.save_dir) / task_id)
logger.info(f"Deep research save directory for this task: {save_dir_for_this_task}")
else:
logger.info("No save_dir configured. Deep research will operate in memory-only mode.")
logger.info(f"Using max_parallel_browsers: {current_max_parallel_browsers}")
result_dict = await agent_instance.run(
topic=research_task,
save_dir=save_dir_for_this_task, # Can be None now
task_id=task_id, # Pass the generated task_id
max_parallel_browsers=current_max_parallel_browsers
)
# Handle the result based on if files were saved or not
if save_dir_for_this_task and result_dict.get("report_file_path") and Path(result_dict["report_file_path"]).exists():
with open(result_dict["report_file_path"], "r", encoding="utf-8") as f:
markdown_content = f.read()
report_content = f"Deep research report generated successfully at {result_dict['report_file_path']}\n\n{markdown_content}"
logger.info(f"Deep research task {task_id} completed. Report at {result_dict['report_file_path']}")
elif result_dict.get("status") == "completed" and result_dict.get("final_report"):
report_content = f"Deep research completed. Report content:\n\n{result_dict['final_report']}"
if result_dict.get("report_file_path"):
report_content += f"\n(Expected report file at: {result_dict['report_file_path']})"
logger.info(f"Deep research task {task_id} completed. Report content retrieved directly.")
else:
report_content = f"Deep research task {task_id} result: {result_dict}. Report file not found or content not available."
logger.warning(report_content)
except Exception as e:
logger.error(f"Error in run_deep_research: {e}\n{traceback.format_exc()}")
report_content = f"Error: {e}"
return report_content
return server
server_instance = serve() # Renamed from 'server' to avoid conflict with 'settings.server'
def main():
logger.info("Starting MCP server for browser-use...")
try:
# Just log the Research tool save directory if it's configured
if settings.research_tool.save_dir:
logger.info(f"Research tool save directory configured: {settings.research_tool.save_dir}")
else:
logger.info("Research tool save directory not configured. Deep research will operate in memory-only mode.")
except Exception as e:
logger.error(f"Configuration error: {e}")
return # Exit if there's a configuration error
logger.info(f"Loaded settings with LLM provider: {settings.llm.provider}, Model: {settings.llm.model_name}")
logger.info(f"Browser keep_open: {settings.browser.keep_open}, Use own browser: {settings.browser.use_own_browser}")
if settings.browser.use_own_browser:
logger.info(f"Connecting to own browser via CDP: {settings.browser.cdp_url}")
server_instance.run()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/agent/deep_research/deep_research_agent.py:
--------------------------------------------------------------------------------
```python
import asyncio
import json
import logging
import os
import pdb
import uuid
from pathlib import Path
from typing import List, Dict, Any, TypedDict, Optional, Sequence, Annotated
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# Langchain imports
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import Tool, StructuredTool
from langchain.agents import AgentExecutor # We might use parts, but Langgraph is primary
from langchain_community.tools.file_management import WriteFileTool, ReadFileTool, CopyFileTool, ListDirectoryTool, \
MoveFileTool, FileSearchTool
from langchain_openai import ChatOpenAI # Replace with your actual LLM import
from pydantic import BaseModel, Field
import operator
from browser_use.browser.browser import BrowserConfig
from browser_use.browser.context import BrowserContextWindowSize
# Langgraph imports
from langgraph.graph import StateGraph, END
from ...controller.custom_controller import CustomController
from ...utils import llm_provider
from ...browser.custom_browser import CustomBrowser
from ...browser.custom_context import CustomBrowserContext, CustomBrowserContextConfig
from ...agent.browser_use.browser_use_agent import BrowserUseAgent
from ...utils.mcp_client import setup_mcp_client_and_tools
logger = logging.getLogger(__name__)
# Constants
REPORT_FILENAME = "report.md"
PLAN_FILENAME = "research_plan.md"
SEARCH_INFO_FILENAME = "search_info.json"
_AGENT_STOP_FLAGS = {}
_BROWSER_AGENT_INSTANCES = {}
async def run_single_browser_task(
task_query: str,
task_id: str,
llm: Any, # Pass the main LLM
browser_config: Dict[str, Any],
stop_event: threading.Event,
use_vision: bool = False,
) -> Dict[str, Any]:
"""
Runs a single BrowserUseAgent task.
Manages browser creation and closing for this specific task.
"""
if not BrowserUseAgent:
return {"query": task_query, "error": "BrowserUseAgent components not available."}
# --- Browser Setup ---
# These should ideally come from the main agent's config
headless = browser_config.get("headless", False)
window_w = browser_config.get("window_width", 1280)
window_h = browser_config.get("window_height", 1100)
browser_user_data_dir = browser_config.get("user_data_dir", None)
use_own_browser = browser_config.get("use_own_browser", False)
browser_binary_path = browser_config.get("browser_binary_path", None)
wss_url = browser_config.get("wss_url", None)
cdp_url = browser_config.get("cdp_url", None)
disable_security = browser_config.get("disable_security", False)
save_downloads_path = browser_config.get("save_downloads_path", None)
trace_path = browser_config.get("trace_path", None)
bu_browser = None
bu_browser_context = None
try:
logger.info(f"Starting browser task for query: {task_query}")
extra_args = [f"--window-size={window_w},{window_h}"]
if browser_user_data_dir:
extra_args.append(f"--user-data-dir={browser_user_data_dir}")
if use_own_browser:
browser_binary_path = os.getenv("CHROME_PATH", None) or browser_binary_path
if browser_binary_path == "": browser_binary_path = None
chrome_user_data = os.getenv("CHROME_USER_DATA", None)
if chrome_user_data: extra_args += [f"--user-data-dir={chrome_user_data}"]
else:
browser_binary_path = None
bu_browser = CustomBrowser(
config=BrowserConfig(
headless=headless,
disable_security=disable_security,
browser_binary_path=browser_binary_path,
extra_browser_args=extra_args,
wss_url=wss_url,
cdp_url=cdp_url,
)
)
context_config = CustomBrowserContextConfig(
save_downloads_path=save_downloads_path,
trace_path=trace_path,
browser_window_size=BrowserContextWindowSize(width=window_w, height=window_h),
force_new_context=True
)
bu_browser_context = await bu_browser.new_context(config=context_config)
# Simple controller example, replace with your actual implementation if needed
bu_controller = CustomController()
# Construct the task prompt for BrowserUseAgent
# Instruct it to find specific info and return title/URL
bu_task_prompt = f"""
Research Task: {task_query}
Objective: Find relevant information answering the query.
Output Requirements: For each relevant piece of information found, please provide:
1. A concise summary of the information.
2. The title of the source page or document.
3. The URL of the source.
Focus on accuracy and relevance. Avoid irrelevant details.
PDF cannot directly extract _content, please try to download first, then using read_file, if you can't save or read, please try other methods.
"""
bu_agent_instance = BrowserUseAgent(
task=bu_task_prompt,
llm=llm, # Use the passed LLM
browser=bu_browser,
browser_context=bu_browser_context,
controller=bu_controller,
use_vision=use_vision,
)
# Store instance for potential stop() call
task_key = f"{task_id}_{uuid.uuid4()}"
_BROWSER_AGENT_INSTANCES[task_key] = bu_agent_instance
# --- Run with Stop Check ---
# BrowserUseAgent needs to internally check a stop signal or have a stop method.
# We simulate checking before starting and assume `run` might be interruptible
# or have its own stop mechanism we can trigger via bu_agent_instance.stop().
if stop_event.is_set():
logger.info(f"Browser task for '{task_query}' cancelled before start.")
return {"query": task_query, "result": None, "status": "cancelled"}
# The run needs to be awaitable and ideally accept a stop signal or have a .stop() method
# result = await bu_agent_instance.run(max_steps=max_steps) # Add max_steps if applicable
# Let's assume a simplified run for now
logger.info(f"Running BrowserUseAgent for: {task_query}")
result = await bu_agent_instance.run() # Assuming run is the main method
logger.info(f"BrowserUseAgent finished for: {task_query}")
final_data = result.final_result()
if stop_event.is_set():
logger.info(f"Browser task for '{task_query}' stopped during execution.")
return {"query": task_query, "result": final_data, "status": "stopped"}
else:
logger.info(f"Browser result for '{task_query}': {final_data}")
return {"query": task_query, "result": final_data, "status": "completed"}
except Exception as e:
logger.error(f"Error during browser task for query '{task_query}': {e}", exc_info=True)
return {"query": task_query, "error": str(e), "status": "failed"}
finally:
if bu_browser_context:
try:
await bu_browser_context.close()
bu_browser_context = None
logger.info("Closed browser context.")
except Exception as e:
logger.error(f"Error closing browser context: {e}")
if bu_browser:
try:
await bu_browser._close_without_httpxclients()
bu_browser = None
logger.info("Closed browser.")
except Exception as e:
logger.error(f"Error closing browser: {e}")
if task_key in _BROWSER_AGENT_INSTANCES:
del _BROWSER_AGENT_INSTANCES[task_key]
class BrowserSearchInput(BaseModel):
queries: List[str] = Field(
description=f"List of distinct search queries to find information relevant to the research task.")
async def _run_browser_search_tool(
queries: List[str],
task_id: str, # Injected dependency
llm: Any, # Injected dependency
browser_config: Dict[str, Any],
stop_event: threading.Event,
max_parallel_browsers: int = 1
) -> List[Dict[str, Any]]:
"""
Internal function to execute parallel browser searches based on LLM-provided queries.
Handles concurrency and stop signals.
"""
# Limit queries just in case LLM ignores the description
queries = queries[:max_parallel_browsers]
logger.info(f"[Browser Tool {task_id}] Running search for {len(queries)} queries: {queries}")
results = []
semaphore = asyncio.Semaphore(max_parallel_browsers)
async def task_wrapper(query):
async with semaphore:
if stop_event.is_set():
logger.info(f"[Browser Tool {task_id}] Skipping task due to stop signal: {query}")
return {"query": query, "result": None, "status": "cancelled"}
# Pass necessary injected configs and the stop event
return await run_single_browser_task(
query,
task_id,
llm, # Pass the main LLM (or a dedicated one if needed)
browser_config,
stop_event
# use_vision could be added here if needed
)
tasks = [task_wrapper(query) for query in queries]
search_results = await asyncio.gather(*tasks, return_exceptions=True)
processed_results = []
for i, res in enumerate(search_results):
query = queries[i] # Get corresponding query
if isinstance(res, Exception):
logger.error(f"[Browser Tool {task_id}] Gather caught exception for query '{query}': {res}", exc_info=True)
processed_results.append({"query": query, "error": str(res), "status": "failed"})
elif isinstance(res, dict):
processed_results.append(res)
else:
logger.error(f"[Browser Tool {task_id}] Unexpected result type for query '{query}': {type(res)}")
processed_results.append({"query": query, "error": "Unexpected result type", "status": "failed"})
logger.info(f"[Browser Tool {task_id}] Finished search. Results count: {len(processed_results)}")
return processed_results
def create_browser_search_tool(
llm: Any,
browser_config: Dict[str, Any],
task_id: str,
stop_event: threading.Event,
max_parallel_browsers: int = 1,
) -> StructuredTool:
"""Factory function to create the browser search tool with necessary dependencies."""
# Use partial to bind the dependencies that aren't part of the LLM call arguments
from functools import partial
bound_tool_func = partial(
_run_browser_search_tool,
task_id=task_id,
llm=llm,
browser_config=browser_config,
stop_event=stop_event,
max_parallel_browsers=max_parallel_browsers
)
return StructuredTool.from_function(
coroutine=bound_tool_func,
name="parallel_browser_search",
description=f"""Use this tool to actively search the web for information related to a specific research task or question.
It runs up to {max_parallel_browsers} searches in parallel using a browser agent for better results than simple scraping.
Provide a list of distinct search queries(up to {max_parallel_browsers}) that are likely to yield relevant information.""",
args_schema=BrowserSearchInput,
)
# --- Langgraph State Definition ---
class ResearchPlanItem(TypedDict):
step: int
task: str
status: str # "pending", "completed", "failed"
queries: Optional[List[str]] # Queries generated for this task
result_summary: Optional[str] # Optional brief summary after execution
class DeepResearchState(TypedDict):
task_id: str
topic: str
research_plan: List[ResearchPlanItem]
search_results: List[Dict[str, Any]] # Stores results from browser_search_tool_func
# messages: Sequence[BaseMessage] # History for ReAct-like steps within nodes
llm: Any # The LLM instance
tools: List[Tool]
output_dir: Path
browser_config: Dict[str, Any]
final_report: Optional[str]
current_step_index: int # To track progress through the plan
stop_requested: bool # Flag to signal termination
# Add other state variables as needed
error_message: Optional[str] # To store errors
messages: List[BaseMessage]
# --- Langgraph Nodes ---
def _load_previous_state(task_id: str, output_dir: str) -> Dict[str, Any]:
"""Loads state from files if they exist."""
state_updates = {}
plan_file = os.path.join(output_dir, PLAN_FILENAME)
search_file = os.path.join(output_dir, SEARCH_INFO_FILENAME)
if os.path.exists(plan_file):
try:
with open(plan_file, 'r', encoding='utf-8') as f:
# Basic parsing, assumes markdown checklist format
plan = []
step = 1
for line in f:
line = line.strip()
if line.startswith(("- [x]", "- [ ]")):
status = "completed" if line.startswith("- [x]") else "pending"
task = line[5:].strip()
plan.append(
ResearchPlanItem(step=step, task=task, status=status, queries=None, result_summary=None))
step += 1
state_updates['research_plan'] = plan
# Determine next step index based on loaded plan
next_step = next((i for i, item in enumerate(plan) if item['status'] == 'pending'), len(plan))
state_updates['current_step_index'] = next_step
logger.info(f"Loaded research plan from {plan_file}, next step index: {next_step}")
except Exception as e:
logger.error(f"Failed to load or parse research plan {plan_file}: {e}")
state_updates['error_message'] = f"Failed to load research plan: {e}"
if os.path.exists(search_file):
try:
with open(search_file, 'r', encoding='utf-8') as f:
state_updates['search_results'] = json.load(f)
logger.info(f"Loaded search results from {search_file}")
except Exception as e:
logger.error(f"Failed to load search results {search_file}: {e}")
state_updates['error_message'] = f"Failed to load search results: {e}"
# Decide if this is fatal or if we can continue without old results
return state_updates
def _save_plan_to_md(plan: List[ResearchPlanItem], output_dir: str):
"""Saves the research plan to a markdown checklist file."""
plan_file = os.path.join(output_dir, PLAN_FILENAME)
try:
with open(plan_file, 'w', encoding='utf-8') as f:
f.write("# Research Plan\n\n")
for item in plan:
marker = "- [x]" if item['status'] == 'completed' else "- [ ]"
f.write(f"{marker} {item['task']}\n")
logger.info(f"Research plan saved to {plan_file}")
except Exception as e:
logger.error(f"Failed to save research plan to {plan_file}: {e}")
def _save_search_results_to_json(results: List[Dict[str, Any]], output_dir: str):
"""Appends or overwrites search results to a JSON file."""
search_file = os.path.join(output_dir, SEARCH_INFO_FILENAME)
try:
# Simple overwrite for now, could be append
with open(search_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
logger.info(f"Search results saved to {search_file}")
except Exception as e:
logger.error(f"Failed to save search results to {search_file}: {e}")
def _save_report_to_md(report: str, output_dir: Path):
"""Saves the final report to a markdown file."""
report_file = os.path.join(output_dir, REPORT_FILENAME)
try:
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
logger.info(f"Final report saved to {report_file}")
except Exception as e:
logger.error(f"Failed to save final report to {report_file}: {e}")
async def planning_node(state: DeepResearchState) -> Dict[str, Any]:
"""Generates the initial research plan or refines it if resuming."""
logger.info("--- Entering Planning Node ---")
if state.get('stop_requested'):
logger.info("Stop requested, skipping planning.")
return {"stop_requested": True}
llm = state['llm']
topic = state['topic']
existing_plan = state.get('research_plan')
existing_results = state.get('search_results')
output_dir = state['output_dir']
if existing_plan and state.get('current_step_index', 0) > 0:
logger.info("Resuming with existing plan.")
# Maybe add logic here to let LLM review and potentially adjust the plan
# based on existing_results, but for now, we just use the loaded plan.
if output_dir:
_save_plan_to_md(existing_plan, output_dir) # Ensure it's saved initially if output_dir exists
return {"research_plan": existing_plan} # Return the loaded plan
logger.info(f"Generating new research plan for topic: {topic}")
prompt = ChatPromptTemplate.from_messages([
("system", """You are a meticulous research assistant. Your goal is to create a step-by-step research plan to thoroughly investigate a given topic.
The plan should consist of clear, actionable research tasks or questions. Each step should logically build towards a comprehensive understanding.
Format the output as a numbered list. Each item should represent a distinct research step or question.
Example:
1. Define the core concepts and terminology related to [Topic].
2. Identify the key historical developments of [Topic].
3. Analyze the current state-of-the-art and recent advancements in [Topic].
4. Investigate the major challenges and limitations associated with [Topic].
5. Explore the future trends and potential applications of [Topic].
6. Summarize the findings and draw conclusions.
Keep the plan focused and manageable. Aim for 5-10 detailed steps.
"""),
("human", f"Generate a research plan for the topic: {topic}")
])
try:
response = await llm.ainvoke(prompt.format_prompt(topic=topic).to_messages())
plan_text = response.content
# Parse the numbered list into the plan structure
new_plan: List[ResearchPlanItem] = []
for i, line in enumerate(plan_text.strip().split('\n')):
line = line.strip()
if line and (line[0].isdigit() or line.startswith(("*", "-"))):
# Simple parsing: remove number/bullet and space
task_text = line.split('.', 1)[-1].strip() if line[0].isdigit() else line[1:].strip()
if task_text:
new_plan.append(ResearchPlanItem(
step=i + 1,
task=task_text,
status="pending",
queries=None,
result_summary=None
))
if not new_plan:
logger.error("LLM failed to generate a valid plan structure.")
return {"error_message": "Failed to generate research plan structure."}
logger.info(f"Generated research plan with {len(new_plan)} steps.")
if output_dir:
_save_plan_to_md(new_plan, output_dir)
return {
"research_plan": new_plan,
"current_step_index": 0, # Start from the beginning
"search_results": [], # Initialize search results
}
except Exception as e:
logger.error(f"Error during planning: {e}", exc_info=True)
return {"error_message": f"LLM Error during planning: {e}"}
async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]:
"""
Executes the next step in the research plan by invoking the LLM with tools.
The LLM decides which tool (e.g., browser search) to use and provides arguments.
"""
logger.info("--- Entering Research Execution Node ---")
if state.get('stop_requested'):
logger.info("Stop requested, skipping research execution.")
return {"stop_requested": True, "current_step_index": state['current_step_index']} # Keep index same
plan = state['research_plan']
current_index = state['current_step_index']
llm = state['llm']
tools = state['tools'] # Tools are now passed in state
output_dir = str(state['output_dir']) if state['output_dir'] else None
task_id = state['task_id']
# Stop event is bound inside the tool function, no need to pass directly here
if not plan or current_index >= len(plan):
logger.info("Research plan complete or empty.")
# This condition should ideally be caught by `should_continue` before reaching here
return {}
current_step = plan[current_index]
if current_step['status'] == 'completed':
logger.info(f"Step {current_step['step']} already completed, skipping.")
return {"current_step_index": current_index + 1} # Move to next step
logger.info(f"Executing research step {current_step['step']}: {current_step['task']}")
# Bind tools to the LLM for this call
llm_with_tools = llm.bind_tools(tools)
if state['messages']:
current_task_message = [HumanMessage(
content=f"Research Task (Step {current_step['step']}): {current_step['task']}")]
invocation_messages = state['messages'] + current_task_message
else:
current_task_message = [
SystemMessage(
content="You are a research assistant executing one step of a research plan. Use the available tools, especially the 'parallel_browser_search' tool, to gather information needed for the current task. Be precise with your search queries if using the browser tool."),
HumanMessage(
content=f"Research Task (Step {current_step['step']}): {current_step['task']}")
]
invocation_messages = current_task_message
try:
# Invoke the LLM, expecting it to make a tool call
logger.info(f"Invoking LLM with tools for task: {current_step['task']}")
ai_response: BaseMessage = await llm_with_tools.ainvoke(invocation_messages)
logger.info("LLM invocation complete.")
tool_results = []
executed_tool_names = []
if not isinstance(ai_response, AIMessage) or not ai_response.tool_calls:
# LLM didn't call a tool. Maybe it answered directly? Or failed?
logger.warning(
f"LLM did not call any tool for step {current_step['step']}. Response: {ai_response.content[:100]}...")
# How to handle this? Mark step as failed? Or store the content?
# Let's mark as failed for now, assuming a tool was expected.
current_step['status'] = 'failed'
current_step['result_summary'] = "LLM did not use a tool as expected."
if output_dir:
_save_plan_to_md(plan, output_dir)
return {
"research_plan": plan,
"current_step_index": current_index + 1,
"error_message": f"LLM failed to call a tool for step {current_step['step']}."
}
# Process tool calls
for tool_call in ai_response.tool_calls:
tool_name = tool_call.get("name")
tool_args = tool_call.get("args", {})
tool_call_id = tool_call.get("id") # Important for ToolMessage
logger.info(f"LLM requested tool call: {tool_name} with args: {tool_args}")
executed_tool_names.append(tool_name)
# Find the corresponding tool instance
selected_tool = next((t for t in tools if t.name == tool_name), None)
if not selected_tool:
logger.error(f"LLM called tool '{tool_name}' which is not available.")
# Create a ToolMessage indicating the error
tool_results.append(ToolMessage(
content=f"Error: Tool '{tool_name}' not found.",
tool_call_id=tool_call_id
))
continue # Skip to next tool call if any
# Execute the tool
try:
# Stop check before executing the tool (tool itself also checks)
stop_event = _AGENT_STOP_FLAGS.get(task_id)
if stop_event and stop_event.is_set():
logger.info(f"Stop requested before executing tool: {tool_name}")
current_step['status'] = 'pending' # Not completed due to stop
if output_dir:
_save_plan_to_md(plan, output_dir)
return {"stop_requested": True, "research_plan": plan}
logger.info(f"Executing tool: {tool_name}")
# Assuming tool functions handle async correctly
tool_output = await selected_tool.ainvoke(tool_args)
logger.info(f"Tool '{tool_name}' executed successfully.")
browser_tool_called = "parallel_browser_search" in executed_tool_names
# Append result to overall search results
current_search_results = state.get('search_results', [])
if browser_tool_called: # Specific handling for browser tool output
current_search_results.extend(tool_output)
else: # Handle other tool outputs (e.g., file tools return strings)
# Store it associated with the step? Or a generic log?
# Let's just log it for now. Need better handling for diverse tool outputs.
logger.info(f"Result from tool '{tool_name}': {str(tool_output)[:200]}...")
# Store result for potential next LLM call (if we were doing multi-turn)
tool_results.append(ToolMessage(
content=json.dumps(tool_output),
tool_call_id=tool_call_id
))
except Exception as e:
logger.error(f"Error executing tool '{tool_name}': {e}", exc_info=True)
tool_results.append(ToolMessage(
content=f"Error executing tool {tool_name}: {e}",
tool_call_id=tool_call_id
))
# Also update overall state search_results with error?
current_search_results = state.get('search_results', [])
current_search_results.append(
{"tool_name": tool_name, "args": tool_args, "status": "failed", "error": str(e)})
# Basic check: Did the browser tool run at all? (More specific checks needed)
browser_tool_called = "parallel_browser_search" in executed_tool_names
# We might need a more nuanced status based on the *content* of tool_results
step_failed = any("Error:" in str(tr.content) for tr in tool_results) or not browser_tool_called
if step_failed:
logger.warning(f"Step {current_step['step']} failed or did not yield results via browser search.")
current_step['status'] = 'failed'
current_step[
'result_summary'] = f"Tool execution failed or browser tool not used. Errors: {[tr.content for tr in tool_results if 'Error' in str(tr.content)]}"
else:
logger.info(f"Step {current_step['step']} completed using tool(s): {executed_tool_names}.")
current_step['status'] = 'completed'
current_step['result_summary'] = f"Executed tool(s): {', '.join(executed_tool_names)}."
if output_dir:
_save_plan_to_md(plan, output_dir)
_save_search_results_to_json(current_search_results, output_dir)
return {
"research_plan": plan,
"search_results": current_search_results, # Update with new results
"current_step_index": current_index + 1,
"messages": state["messages"] + current_task_message + [ai_response] + tool_results,
# Optionally return the tool_results messages if needed by downstream nodes
}
except Exception as e:
logger.error(f"Unhandled error during research execution node for step {current_step['step']}: {e}",
exc_info=True)
current_step['status'] = 'failed'
if output_dir:
_save_plan_to_md(plan, output_dir)
return {
"research_plan": plan,
"current_step_index": current_index + 1, # Move on even if error?
"error_message": f"Core Execution Error on step {current_step['step']}: {e}"
}
async def synthesis_node(state: DeepResearchState) -> Dict[str, Any]:
"""Synthesizes the final report from the collected search results."""
logger.info("--- Entering Synthesis Node ---")
if state.get('stop_requested'):
logger.info("Stop requested, skipping synthesis.")
return {"stop_requested": True}
llm = state['llm']
topic = state['topic']
search_results = state.get('search_results', [])
output_dir = state['output_dir']
plan = state['research_plan'] # Include plan for context
if not search_results:
logger.warning("No search results found to synthesize report.")
report = f"# Research Report: {topic}\n\nNo information was gathered during the research process."
if output_dir:
_save_report_to_md(report, output_dir)
return {"final_report": report}
logger.info(f"Synthesizing report from {len(search_results)} collected search result entries.")
# Prepare context for the LLM
# Format search results nicely, maybe group by query or original plan step
formatted_results = ""
references = {}
ref_count = 1
for i, result_entry in enumerate(search_results):
query = result_entry.get('query', 'Unknown Query')
status = result_entry.get('status', 'unknown')
result_data = result_entry.get('result') # This should be the dict with summary, title, url
error = result_entry.get('error')
if status == 'completed' and result_data:
summary = result_data
formatted_results += f"### Finding from Query: \"{query}\"\n"
formatted_results += f"- **Summary:**\n{summary}\n"
formatted_results += "---\n"
elif status == 'failed':
formatted_results += f"### Failed Query: \"{query}\"\n"
formatted_results += f"- **Error:** {error}\n"
formatted_results += "---\n"
# Ignore cancelled/other statuses for the report content
# Prepare the research plan context
plan_summary = "\nResearch Plan Followed:\n"
for item in plan:
marker = "- [x]" if item['status'] == 'completed' else "- [ ] (Failed)" if item[
'status'] == 'failed' else "- [ ]"
plan_summary += f"{marker} {item['task']}\n"
synthesis_prompt = ChatPromptTemplate.from_messages([
("system", """You are a professional researcher tasked with writing a comprehensive and well-structured report based on collected findings.
The report should address the research topic thoroughly, synthesizing the information gathered from various sources.
Structure the report logically:
1. **Introduction:** Briefly introduce the topic and the report's scope (mentioning the research plan followed is good).
2. **Main Body:** Discuss the key findings, organizing them thematically or according to the research plan steps. Analyze, compare, and contrast information from different sources where applicable. **Crucially, cite your sources using bracketed numbers [X] corresponding to the reference list.**
3. **Conclusion:** Summarize the main points and offer concluding thoughts or potential areas for further research.
Ensure the tone is objective, professional, and analytical. Base the report **strictly** on the provided findings. Do not add external knowledge. If findings are contradictory or incomplete, acknowledge this.
"""),
("human", f"""
**Research Topic:** {topic}
{plan_summary}
**Collected Findings:**
```
{formatted_results}
```
```
Please generate the final research report in Markdown format based **only** on the information above. Ensure all claims derived from the findings are properly cited using the format [Reference_ID].
""")
])
try:
response = await llm.ainvoke(synthesis_prompt.format_prompt(
topic=topic,
plan_summary=plan_summary,
formatted_results=formatted_results,
references=references
).to_messages())
final_report_md = response.content
# Append the reference list automatically to the end of the generated markdown
if references:
report_references_section = "\n\n## References\n\n"
# Sort refs by ID for consistent output
sorted_refs = sorted(references.values(), key=lambda x: x['id'])
for ref in sorted_refs:
report_references_section += f"[{ref['id']}] {ref['title']} - {ref['url']}\n"
final_report_md += report_references_section
logger.info("Successfully synthesized the final report.")
if output_dir:
_save_report_to_md(final_report_md, output_dir)
return {"final_report": final_report_md}
except Exception as e:
logger.error(f"Error during report synthesis: {e}", exc_info=True)
return {"error_message": f"LLM Error during synthesis: {e}"}
# --- Langgraph Edges and Conditional Logic ---
def should_continue(state: DeepResearchState) -> str:
"""Determines the next step based on the current state."""
logger.info("--- Evaluating Condition: Should Continue? ---")
if state.get('stop_requested'):
logger.info("Stop requested, routing to END.")
return "end_run" # Go to a dedicated end node for cleanup if needed
if state.get('error_message'):
logger.warning(f"Error detected: {state['error_message']}. Routing to END.")
# Decide if errors should halt execution or if it should try to synthesize anyway
return "end_run" # Stop on error for now
plan = state.get('research_plan')
current_index = state.get('current_step_index', 0)
if not plan:
logger.warning("No research plan found, cannot continue execution. Routing to END.")
return "end_run" # Should not happen if planning node ran correctly
# Check if there are pending steps in the plan
if current_index < len(plan):
logger.info(
f"Plan has pending steps (current index {current_index}/{len(plan)}). Routing to Research Execution.")
return "execute_research"
else:
logger.info("All plan steps processed. Routing to Synthesis.")
return "synthesize_report"
# --- DeepSearchAgent Class ---
class DeepResearchAgent:
def __init__(self, llm: Any, browser_config: Dict[str, Any], mcp_server_config: Optional[Dict[str, Any]] = None):
"""
Initializes the DeepSearchAgent.
Args:
llm: The Langchain compatible language model instance.
browser_config: Configuration dictionary for the BrowserUseAgent tool.
Example: {"headless": True, "window_width": 1280, ...}
mcp_server_config: Optional configuration for the MCP client.
"""
self.llm = llm
self.browser_config = browser_config
self.mcp_server_config = mcp_server_config
self.mcp_client = None
self.stopped = False
self.graph = self._compile_graph()
self.current_task_id: Optional[str] = None
self.stop_event: Optional[threading.Event] = None
self.runner: Optional[asyncio.Task] = None # To hold the asyncio task for run
async def _setup_tools(self, task_id: str, stop_event: threading.Event, max_parallel_browsers: int = 1) -> List[
Tool]:
"""Sets up the basic tools (File I/O) and optional MCP tools."""
tools = [WriteFileTool(), ReadFileTool(), ListDirectoryTool()] # Basic file operations
browser_use_tool = create_browser_search_tool(
llm=self.llm,
browser_config=self.browser_config,
task_id=task_id,
stop_event=stop_event,
max_parallel_browsers=max_parallel_browsers
)
tools += [browser_use_tool]
# Add MCP tools if config is provided
if self.mcp_server_config:
try:
logger.info("Setting up MCP client and tools...")
if not self.mcp_client:
self.mcp_client = await setup_mcp_client_and_tools(self.mcp_server_config)
mcp_tools = self.mcp_client.get_tools()
logger.info(f"Loaded {len(mcp_tools)} MCP tools.")
tools.extend(mcp_tools)
except Exception as e:
logger.error(f"Failed to set up MCP tools: {e}", exc_info=True)
elif self.mcp_server_config:
logger.warning("MCP server config provided, but setup function unavailable.")
tools_map = {tool.name: tool for tool in tools}
return tools_map.values()
async def close_mcp_client(self):
if self.mcp_client:
await self.mcp_client.__aexit__(None, None, None)
self.mcp_client = None
def _compile_graph(self) -> StateGraph:
"""Compiles the Langgraph state machine."""
workflow = StateGraph(DeepResearchState)
# Add nodes
workflow.add_node("plan_research", planning_node)
workflow.add_node("execute_research", research_execution_node)
workflow.add_node("synthesize_report", synthesis_node)
workflow.add_node("end_run", lambda state: logger.info("--- Reached End Run Node ---") or {}) # Simple end node
# Define edges
workflow.set_entry_point("plan_research")
workflow.add_edge("plan_research", "execute_research") # Always execute after planning
# Conditional edge after execution
workflow.add_conditional_edges(
"execute_research",
should_continue,
{
"execute_research": "execute_research", # Loop back if more steps
"synthesize_report": "synthesize_report", # Move to synthesis if done
"end_run": "end_run" # End if stop requested or error
}
)
workflow.add_edge("synthesize_report", "end_run") # End after synthesis
app = workflow.compile()
return app
async def run(self, topic: str, save_dir: Optional[str] = None, task_id: Optional[str] = None, max_parallel_browsers: int = 1) -> Dict[
str, Any]:
"""
Starts the deep research process.
Args:
topic: The research topic.
save_dir: Optional directory to save outputs for this task. If None, operates in memory-only mode.
task_id: Optional existing task ID to resume. If None, a new ID is generated.
max_parallel_browsers: Max parallel browsers for the search tool.
Returns:
A dictionary containing the final status, message, task_id, and final_state.
"""
if self.runner and not self.runner.done():
logger.warning("Agent is already running. Please stop the current task first.")
return {"status": "error", "message": "Agent already running.", "task_id": self.current_task_id}
self.current_task_id = task_id if task_id else str(uuid.uuid4())
output_dir = None
if save_dir:
output_dir = os.path.join(save_dir, self.current_task_id)
os.makedirs(output_dir, exist_ok=True)
logger.info(f"[AsyncGen] Output directory: {output_dir}")
else:
logger.info(f"[AsyncGen] Running in memory-only mode (no save_dir provided)")
logger.info(f"[AsyncGen] Starting research task ID: {self.current_task_id} for topic: '{topic}'")
self.stop_event = threading.Event()
_AGENT_STOP_FLAGS[self.current_task_id] = self.stop_event
agent_tools = await self._setup_tools(self.current_task_id, self.stop_event, max_parallel_browsers)
initial_state: DeepResearchState = {
"task_id": self.current_task_id,
"topic": topic,
"research_plan": [],
"search_results": [],
"messages": [],
"llm": self.llm,
"tools": agent_tools,
"output_dir": output_dir,
"browser_config": self.browser_config,
"final_report": None,
"current_step_index": 0,
"stop_requested": False,
"error_message": None,
}
loaded_state = {}
if task_id and output_dir:
# Only try to resume from files if we have a task_id and output_dir
logger.info(f"Attempting to resume task {task_id}...")
loaded_state = _load_previous_state(task_id, output_dir)
initial_state.update(loaded_state)
if loaded_state.get("research_plan"):
logger.info(
f"Resuming with {len(loaded_state['research_plan'])} plan steps and {len(loaded_state.get('search_results', []))} existing results.")
initial_state[
"topic"] = topic # Allow overriding topic even when resuming? Or use stored topic? Let's use new one.
else:
logger.warning(f"Resume requested for {task_id}, but no previous plan found. Starting fresh.")
initial_state["current_step_index"] = 0
# --- Execute Graph using ainvoke ---
final_state = None
status = "unknown"
message = None
try:
logger.info(f"Invoking graph execution for task {self.current_task_id}...")
self.runner = asyncio.create_task(self.graph.ainvoke(initial_state))
final_state = await self.runner
logger.info(f"Graph execution finished for task {self.current_task_id}.")
# Determine status based on final state
if self.stop_event and self.stop_event.is_set():
status = "stopped"
message = "Research process was stopped by request."
logger.info(message)
elif final_state and final_state.get("error_message"):
status = "error"
message = final_state["error_message"]
logger.error(f"Graph execution completed with error: {message}")
elif final_state and final_state.get("final_report"):
status = "completed"
message = "Research process completed successfully."
logger.info(message)
else:
# If it ends without error/report (e.g., empty plan, stopped before synthesis)
status = "finished_incomplete"
message = "Research process finished, but may be incomplete (no final report generated)."
logger.warning(message)
except asyncio.CancelledError:
status = "cancelled"
message = f"Agent run task cancelled for {self.current_task_id}."
logger.info(message)
# final_state will remain None or the state before cancellation if checkpointing was used
except Exception as e:
status = "error"
message = f"Unhandled error during graph execution for {self.current_task_id}: {e}"
logger.error(message, exc_info=True)
# final_state will remain None or the state before the error
finally:
logger.info(f"Cleaning up resources for task {self.current_task_id}")
task_id_to_clean = self.current_task_id
self.stop_event = None
self.current_task_id = None
self.runner = None # Mark runner as finished
if self.mcp_client:
await self.mcp_client.__aexit__(None, None, None)
# Construct result with report_file_path if available
result = {
"status": status,
"message": message,
"task_id": task_id_to_clean, # Use the stored task_id
"final_state": final_state if final_state else {} # Return the final state dict
}
# Add report file path if we have an output directory and a final report was generated
if output_dir and final_state and final_state.get("final_report"):
report_path = os.path.join(output_dir, REPORT_FILENAME)
result["report_file_path"] = report_path
return result
async def _stop_lingering_browsers(self, task_id):
"""Attempts to stop any BrowserUseAgent instances associated with the task_id."""
keys_to_stop = [key for key in _BROWSER_AGENT_INSTANCES if key.startswith(f"{task_id}_")]
if not keys_to_stop:
return
logger.warning(
f"Found {len(keys_to_stop)} potentially lingering browser agents for task {task_id}. Attempting stop...")
for key in keys_to_stop:
agent_instance = _BROWSER_AGENT_INSTANCES.get(key)
try:
if agent_instance:
# Assuming BU agent has an async stop method
await agent_instance.stop()
logger.info(f"Called stop() on browser agent instance {key}")
except Exception as e:
logger.error(f"Error calling stop() on browser agent instance {key}: {e}")
async def stop(self):
"""Signals the currently running agent task to stop."""
if not self.current_task_id or not self.stop_event:
logger.info("No agent task is currently running.")
return
logger.info(f"Stop requested for task ID: {self.current_task_id}")
self.stop_event.set() # Signal the stop event
self.stopped = True
await self._stop_lingering_browsers(self.current_task_id)
def close(self):
self.stopped = False
```