#
tokens: 40093/50000 26/26 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .env.example
├── .gitignore
├── .pre-commit-config.yaml
├── .python-version
├── assets
│   └── header.png
├── CLAUDE.md
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│   └── mcp_server_browser_use
│       ├── __init__.py
│       ├── __main__.py
│       ├── _internal
│       │   ├── __init__.py
│       │   ├── agent
│       │   │   ├── __init__.py
│       │   │   ├── browser_use
│       │   │   │   └── browser_use_agent.py
│       │   │   └── deep_research
│       │   │       └── deep_research_agent.py
│       │   ├── browser
│       │   │   ├── __init__.py
│       │   │   ├── custom_browser.py
│       │   │   └── custom_context.py
│       │   ├── controller
│       │   │   ├── __init__.py
│       │   │   └── custom_controller.py
│       │   └── utils
│       │       ├── __init__.py
│       │       ├── config.py
│       │       ├── llm_provider.py
│       │       ├── mcp_client.py
│       │       └── utils.py
│       ├── cli.py
│       ├── config.py
│       └── server.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.11

```

--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------

```yaml
fail_fast: true

repos:
  - repo: https://github.com/pre-commit/mirrors-prettier
    rev: v3.1.0
    hooks:
      - id: prettier
        types_or: [yaml, json5]

  # - repo: https://github.com/astral-sh/ruff-pre-commit
  #   rev: v0.8.1
  #   hooks:
  #     - id: ruff-format
  #     - id: ruff
  #       args: [--fix, --exit-non-zero-on-fix]

  - repo: local
    hooks:
      - id: uv-lock-check
        name: Check uv.lock is up to date
        entry: uv lock --check
        language: system
        files: ^(pyproject\.toml|uv\.lock)$
        pass_filenames: false

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# File created using '.gitignore Generator' for Visual Studio Code: https://bit.ly/vscode-gig
# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,python
# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,macos,python

### macOS ###
# General
.DS_Store
.AppleDouble
.LSOverride

# Icon must end with two \r
Icon


# Thumbnails
._*

# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent

# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk

### macOS Patch ###
# iCloud generated files
*.icloud

### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/#use-with-ide
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml

# ruff
.ruff_cache/

# LSP config files
pyrightconfig.json

### VisualStudioCode ###
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets

# Local History for Visual Studio Code
.history/

# Built Visual Studio Code Extensions
*.vsix

### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide

# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,macos,python

# Custom rules (everything added below won't be overriden by 'Generate .gitignore File' if you use 'Update' option)

agent_history.gif
trace.json
recording.mp4
temp/
tmp/
.vscode/

```

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
# This is an example .env file. Copy it to .env and fill in your values.
# Lines starting with # are comments.

# === Main LLM Configuration (MCP_LLM_*) ===
# Select the primary LLM provider
# Options: openai, azure_openai, anthropic, google, mistral, ollama, deepseek, openrouter, alibaba, moonshot, unbound
MCP_LLM_PROVIDER=google
# Specify the model name for the selected provider
MCP_LLM_MODEL_NAME=gemini-2.5-flash-preview-04-17
# LLM temperature (0.0-2.0). Controls randomness.
MCP_LLM_TEMPERATURE=0.0
# Optional: Generic override for the LLM provider's base URL
# MCP_LLM_BASE_URL=
# Optional: Generic override for the LLM provider's API key. Takes precedence over provider-specific keys.
# MCP_LLM_API_KEY=

# --- Provider Specific API Keys (MCP_LLM_*) ---
# Required unless using Ollama locally without auth or generic MCP_LLM_API_KEY is set
# MCP_LLM_OPENAI_API_KEY=YOUR_OPENAI_API_KEY
# MCP_LLM_ANTHROPIC_API_KEY=YOUR_ANTHROPIC_API_KEY
# MCP_LLM_GOOGLE_API_KEY=YOUR_GOOGLE_API_KEY
# MCP_LLM_AZURE_OPENAI_API_KEY=YOUR_AZURE_OPENAI_API_KEY
# MCP_LLM_DEEPSEEK_API_KEY=YOUR_DEEPSEEK_API_KEY
# MCP_LLM_MISTRAL_API_KEY=YOUR_MISTRAL_API_KEY
# MCP_LLM_OPENROUTER_API_KEY=YOUR_OPENROUTER_API_KEY
# MCP_LLM_ALIBABA_API_KEY=YOUR_ALIBABA_API_KEY
# MCP_LLM_MOONSHOT_API_KEY=YOUR_MOONSHOT_API_KEY
# MCP_LLM_UNBOUND_API_KEY=YOUR_UNBOUND_API_KEY

# --- Provider Specific Endpoints (MCP_LLM_*) ---
# Optional: Override default API endpoints.
# MCP_LLM_OPENAI_ENDPOINT=https://api.openai.com/v1
# MCP_LLM_ANTHROPIC_ENDPOINT=https://api.anthropic.com
# MCP_LLM_AZURE_OPENAI_ENDPOINT=YOUR_AZURE_ENDPOINT # Required if using Azure, e.g., https://your-resource.openai.azure.com/
# MCP_LLM_AZURE_OPENAI_API_VERSION=2025-01-01-preview
# MCP_LLM_DEEPSEEK_ENDPOINT=https://api.deepseek.com
# MCP_LLM_MISTRAL_ENDPOINT=https://api.mistral.ai/v1
# MCP_LLM_OLLAMA_ENDPOINT=http://localhost:11434
# MCP_LLM_OPENROUTER_ENDPOINT=https://openrouter.ai/api/v1
# MCP_LLM_ALIBABA_ENDPOINT=https://dashscope.aliyuncs.com/compatible-mode/v1
# MCP_LLM_MOONSHOT_ENDPOINT=https://api.moonshot.cn/v1
# MCP_LLM_UNBOUND_ENDPOINT=https://api.getunbound.ai

# --- Ollama Specific (MCP_LLM_*) ---
# MCP_LLM_OLLAMA_NUM_CTX=32000
# MCP_LLM_OLLAMA_NUM_PREDICT=1024

# === Planner LLM Configuration (Optional, MCP_LLM_PLANNER_*) ===
# If you want to use a different LLM for planning tasks within agents.
# Defaults to main LLM settings if not specified.
# MCP_LLM_PLANNER_PROVIDER=
# MCP_LLM_PLANNER_MODEL_NAME=
# MCP_LLM_PLANNER_TEMPERATURE=
# MCP_LLM_PLANNER_BASE_URL=
# MCP_LLM_PLANNER_API_KEY= # Generic planner API key, or use provider-specific below
# MCP_LLM_PLANNER_OPENAI_API_KEY=
# ... (similar provider-specific keys and endpoints for planner if needed)

# === Browser Configuration (MCP_BROWSER_*) ===
# General browser headless mode (true/false)
MCP_BROWSER_HEADLESS=false
# General browser disable security features (use cautiously) (true/false)
MCP_BROWSER_DISABLE_SECURITY=false
# Optional: Path to Chrome/Chromium executable
# MCP_BROWSER_BINARY_PATH=/usr/bin/chromium-browser
# Optional: Path to Chrome user data directory (for persistent sessions)
# MCP_BROWSER_USER_DATA_DIR=~/.config/google-chrome/Profile 1
MCP_BROWSER_WINDOW_WIDTH=1280
MCP_BROWSER_WINDOW_HEIGHT=1080
# Set to true to connect to user's browser via MCP_BROWSER_CDP_URL
MCP_BROWSER_USE_OWN_BROWSER=false
# Optional: Connect to existing Chrome via DevTools Protocol URL. Required if MCP_BROWSER_USE_OWN_BROWSER=true.
# MCP_BROWSER_CDP_URL=http://localhost:9222
# MCP_BROWSER_WSS_URL= # Optional: WSS URL if CDP URL is not sufficient
# Keep browser managed by server open between MCP tool calls (if MCP_BROWSER_USE_OWN_BROWSER=false)
MCP_BROWSER_KEEP_OPEN=false
# Optional: Directory to save Playwright trace files (useful for debugging). If not set, tracing to file is disabled.
# MCP_BROWSER_TRACE_PATH=./tmp/trace

# === Agent Tool Configuration (`run_browser_agent` tool, MCP_AGENT_TOOL_*) ===
MCP_AGENT_TOOL_MAX_STEPS=100
MCP_AGENT_TOOL_MAX_ACTIONS_PER_STEP=5
# Method for tool invocation ('auto', 'json_schema', 'function_calling')
MCP_AGENT_TOOL_TOOL_CALLING_METHOD=auto
MCP_AGENT_TOOL_MAX_INPUT_TOKENS=128000
# Enable vision capabilities (screenshot analysis)
MCP_AGENT_TOOL_USE_VISION=true
# Override general browser headless mode for this tool (true/false/empty for general setting)
# MCP_AGENT_TOOL_HEADLESS=
# Override general browser disable security for this tool (true/false/empty for general setting)
# MCP_AGENT_TOOL_DISABLE_SECURITY=
# Enable Playwright video recording (true/false)
MCP_AGENT_TOOL_ENABLE_RECORDING=false
# Optional: Path to save agent run video recordings. If not set, recording to file is disabled even if ENABLE_RECORDING=true.
# MCP_AGENT_TOOL_SAVE_RECORDING_PATH=./tmp/recordings
# Optional: Directory to save agent history JSON files. If not set, history saving is disabled.
# MCP_AGENT_TOOL_HISTORY_PATH=./tmp/agent_history

# === Deep Research Tool Configuration (`run_deep_research` tool, MCP_RESEARCH_TOOL_*) ===
MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS=3
# MANDATORY: Base directory to save research artifacts (report, results). Task ID will be appended.
# Example: MCP_RESEARCH_TOOL_SAVE_DIR=/mnt/data/research_outputs
# Example: MCP_RESEARCH_TOOL_SAVE_DIR=C:\\Users\\YourUser\\Documents\\ResearchData
MCP_RESEARCH_TOOL_SAVE_DIR=./tmp/deep_research

# === Path Configuration (MCP_PATHS_*) ===
# Optional: Directory for downloaded files. If not set, persistent downloads to a specific path are disabled.
# MCP_PATHS_DOWNLOADS=./tmp/downloads

# === Server Configuration (MCP_SERVER_*) ===
# Path for the server log file. Leave empty for stdout.
# MCP_SERVER_LOG_FILE=mcp_server_browser_use.log
# Logging level: DEBUG, INFO, WARNING, ERROR, CRITICAL
MCP_SERVER_LOGGING_LEVEL=INFO
# Enable/disable anonymized telemetry (true/false)
MCP_SERVER_ANONYMIZED_TELEMETRY=true
# Optional: JSON string for MCP client configuration for the controller
# MCP_SERVER_MCP_CONFIG='{"client_name": "mcp-browser-use-controller"}'

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
<img src="./assets/header.png" alt="Browser Use Web UI" width="full"/>

<br/>

# browser-use MCP server & CLI
[![Documentation](https://img.shields.io/badge/Documentation-📕-blue)](https://docs.browser-use.com)
[![License](https://img.shields.io/badge/License-MIT-green)](LICENSE)

> **Project Note**: This MCP server implementation builds upon the [browser-use/web-ui](https://github.com/browser-use/web-ui) foundation. Core browser automation logic and configuration patterns are adapted from the original project.

AI-driven browser automation server implementing the Model Context Protocol (MCP) for natural language browser control and web research. Also provides CLI access to its core functionalities.

<a href="https://glama.ai/mcp/servers/@Saik0s/mcp-browser-use"><img width="380" height="200" src="https://glama.ai/mcp/servers/@Saik0s/mcp-browser-use/badge" alt="Browser-Use MCP server" /></a>

## Features

-   🧠 **MCP Integration** - Full protocol implementation for AI agent communication.
-   🌐 **Browser Automation** - Page navigation, form filling, element interaction via natural language (`run_browser_agent` tool).
-   👁️ **Visual Understanding** - Optional screenshot analysis for vision-capable LLMs.
-   🔄 **State Persistence** - Option to manage a server browser session across multiple MCP calls or connect to user's browser.
-   🔌 **Multi-LLM Support** - Integrates with OpenAI, Anthropic, Azure, DeepSeek, Google, Mistral, Ollama, OpenRouter, Alibaba, Moonshot, Unbound AI.
-   🔍 **Deep Research Tool** - Dedicated tool for multi-step web research and report generation (`run_deep_research` tool).
-   ⚙️ **Environment Variable Configuration** - Fully configurable via environment variables using a structured Pydantic model.
-   🔗 **CDP Connection** - Ability to connect to and control a user-launched Chrome/Chromium instance via Chrome DevTools Protocol.
-   ⌨️ **CLI Interface** - Access core agent functionalities (`run_browser_agent`, `run_deep_research`) directly from the command line for testing and scripting.

## Quick Start

### The Essentials

1. Install UV - the rocket-powered Python installer:
`curl -LsSf https://astral.sh/uv/install.sh | sh`

2. Get Playwright browsers (required for automation):
`uvx --from mcp-server-browser-use@latest python -m playwright install`

### Integration Patterns

For MCP clients like Claude Desktop, add a server configuration that's as simple as:

```json
// Example 1: One-Line Latest Version (Always Fresh)
"mcpServers": {
    "browser-use": {
      "command": "uvx",
      "args": ["mcp-server-browser-use@latest"],
      "env": {
        "MCP_LLM_GOOGLE_API_KEY": "YOUR_KEY_HERE_IF_USING_GOOGLE",
        "MCP_LLM_PROVIDER": "google",
        "MCP_LLM_MODEL_NAME": "gemini-2.5-flash-preview-04-17",
        "MCP_BROWSER_HEADLESS": "true",
      }
    }
}
```

```json
// Example 2: Advanced Configuration with CDP
"mcpServers": {
    "browser-use": {
      "command": "uvx",
      "args": ["mcp-server-browser-use@latest"],
      "env": {
        "MCP_LLM_OPENROUTER_API_KEY": "YOUR_KEY_HERE_IF_USING_OPENROUTER",
        "MCP_LLM_PROVIDER": "openrouter",
        "MCP_LLM_MODEL_NAME": "anthropic/claude-3.5-haiku",
        "MCP_LLM_TEMPERATURE": "0.4",

        "MCP_BROWSER_HEADLESS": "false",
        "MCP_BROWSER_WINDOW_WIDTH": "1440",
        "MCP_BROWSER_WINDOW_HEIGHT": "1080",
        "MCP_AGENT_TOOL_USE_VISION": "true",

        "MCP_RESEARCH_TOOL_SAVE_DIR": "/path/to/your/research",
        "MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS": "5",

        "MCP_PATHS_DOWNLOADS": "/path/to/your/downloads",

        "MCP_BROWSER_USE_OWN_BROWSER": "true",
        "MCP_BROWSER_CDP_URL": "http://localhost:9222",

        "MCP_AGENT_TOOL_HISTORY_PATH": "/path/to/your/history",

        "MCP_SERVER_LOGGING_LEVEL": "DEBUG",
        "MCP_SERVER_LOG_FILE": "/path/to/your/log/mcp_server_browser_use.log",
      }
    }
}
```

```json
// Example 3: Advanced Configuration with User Data and custom chrome path
"mcpServers": {
    "browser-use": {
      "command": "uvx",
      "args": ["mcp-server-browser-use@latest"],
      "env": {
        "MCP_LLM_OPENAI_API_KEY": "YOUR_KEY_HERE_IF_USING_OPENAI",
        "MCP_LLM_PROVIDER": "openai",
        "MCP_LLM_MODEL_NAME": "gpt-4.1-mini",
        "MCP_LLM_TEMPERATURE": "0.2",

        "MCP_BROWSER_HEADLESS": "false",

        "MCP_BROWSER_BINARY_PATH": "/path/to/your/chrome/binary",
        "MCP_BROWSER_USER_DATA_DIR": "/path/to/your/user/data",
        "MCP_BROWSER_DISABLE_SECURITY": "true",
        "MCP_BROWSER_KEEP_OPEN": "true",
        "MCP_BROWSER_TRACE_PATH": "/path/to/your/trace",

        "MCP_AGENT_TOOL_HISTORY_PATH": "/path/to/your/history",

        "MCP_SERVER_LOGGING_LEVEL": "DEBUG",
        "MCP_SERVER_LOG_FILE": "/path/to/your/log/mcp_server_browser_use.log",
      }
    }
}
```

```json
// Example 4: Local Development Flow
"mcpServers": {
    "browser-use": {
      "command": "uv",
      "args": [
        "--directory",
        "/your/dev/path",
        "run",
        "mcp-server-browser-use"
      ],
      "env": {
        "MCP_LLM_OPENROUTER_API_KEY": "YOUR_KEY_HERE_IF_USING_OPENROUTER",
        "MCP_LLM_PROVIDER": "openrouter",
        "MCP_LLM_MODEL_NAME": "openai/gpt-4o-mini",
        "MCP_BROWSER_HEADLESS": "true",
      }
    }
}
```

**Key Insight:** The best configurations emerge from starting simple (Example 1). The .env.example file contains all possible dials.

## MCP Tools

This server exposes the following tools via the Model Context Protocol:

### Synchronous Tools (Wait for Completion)

1.  **`run_browser_agent`**
    *   **Description:** Executes a browser automation task based on natural language instructions and waits for it to complete. Uses settings from `MCP_AGENT_TOOL_*`, `MCP_LLM_*`, and `MCP_BROWSER_*` environment variables.
    *   **Arguments:**
        *   `task` (string, required): The primary task or objective.
    *   **Returns:** (string) The final result extracted by the agent or an error message. Agent history (JSON, optional GIF) saved if `MCP_AGENT_TOOL_HISTORY_PATH` is set.

2.  **`run_deep_research`**
    *   **Description:** Performs in-depth web research on a topic, generates a report, and waits for completion. Uses settings from `MCP_RESEARCH_TOOL_*`, `MCP_LLM_*`, and `MCP_BROWSER_*` environment variables. If `MCP_RESEARCH_TOOL_SAVE_DIR` is set, outputs are saved to a subdirectory within it; otherwise, operates in memory-only mode.
    *   **Arguments:**
        *   `research_task` (string, required): The topic or question for the research.
        *   `max_parallel_browsers` (integer, optional): Overrides `MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS` from environment.
    *   **Returns:** (string) The generated research report in Markdown format, including the file path (if saved), or an error message.

## CLI Usage

This package also provides a command-line interface `mcp-browser-cli` for direct testing and scripting.

**Global Options:**
*   `--env-file PATH, -e PATH`: Path to a `.env` file to load configurations from.
*   `--log-level LEVEL, -l LEVEL`: Override the logging level (e.g., `DEBUG`, `INFO`).

**Commands:**

1.  **`mcp-browser-cli run-browser-agent [OPTIONS] TASK`**
    *   **Description:** Runs a browser agent task.
    *   **Arguments:**
        *   `TASK` (string, required): The primary task for the agent.
    *   **Example:**
        ```bash
        mcp-browser-cli run-browser-agent "Go to example.com and find the title." -e .env
        ```

2.  **`mcp-browser-cli run-deep-research [OPTIONS] RESEARCH_TASK`**
    *   **Description:** Performs deep web research.
    *   **Arguments:**
        *   `RESEARCH_TASK` (string, required): The topic or question for research.
    *   **Options:**
        *   `--max-parallel-browsers INTEGER, -p INTEGER`: Override `MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS`.
    *   **Example:**
        ```bash
        mcp-browser-cli run-deep-research "What are the latest advancements in AI-driven browser automation?" --max-parallel-browsers 5 -e .env
        ```

All other configurations (LLM keys, paths, browser settings) are picked up from environment variables (or the specified `.env` file) as detailed in the Configuration section.

## Configuration (Environment Variables)

Configure the server and CLI using environment variables. You can set these in your system or place them in a `.env` file in the project root (use `--env-file` for CLI). Variables are structured with prefixes.

| Variable Group (Prefix)             | Example Variable                               | Description                                                                                                | Default Value                     |
| :---------------------------------- | :--------------------------------------------- | :--------------------------------------------------------------------------------------------------------- | :-------------------------------- |
| **Main LLM (MCP_LLM_)**             |                                                | Settings for the primary LLM used by agents.                                                               |                                   |
|                                     | `MCP_LLM_PROVIDER`                             | LLM provider. Options: `openai`, `azure_openai`, `anthropic`, `google`, `mistral`, `ollama`, etc.         | `openai`                          |
|                                     | `MCP_LLM_MODEL_NAME`                           | Specific model name for the provider.                                                                      | `gpt-4.1`                         |
|                                     | `MCP_LLM_TEMPERATURE`                          | LLM temperature (0.0-2.0).                                                                                 | `0.0`                             |
|                                     | `MCP_LLM_BASE_URL`                             | Optional: Generic override for LLM provider's base URL.                                                    | Provider-specific                 |
|                                     | `MCP_LLM_API_KEY`                              | Optional: Generic LLM API key (takes precedence).                                                          | -                                 |
|                                     | `MCP_LLM_OPENAI_API_KEY`                       | API Key for OpenAI (if provider is `openai`).                                                              | -                                 |
|                                     | `MCP_LLM_ANTHROPIC_API_KEY`                    | API Key for Anthropic.                                                                                     | -                                 |
|                                     | `MCP_LLM_GOOGLE_API_KEY`                       | API Key for Google AI (Gemini).                                                                            | -                                 |
|                                     | `MCP_LLM_AZURE_OPENAI_API_KEY`                 | API Key for Azure OpenAI.                                                                                  | -                                 |
|                                     | `MCP_LLM_AZURE_OPENAI_ENDPOINT`                | **Required if using Azure.** Your Azure resource endpoint.                                                 | -                                 |
|                                     | `MCP_LLM_OLLAMA_ENDPOINT`                      | Ollama API endpoint URL.                                                                                   | `http://localhost:11434`          |
|                                     | `MCP_LLM_OLLAMA_NUM_CTX`                       | Context window size for Ollama models.                                                                     | `32000`                           |
| **Planner LLM (MCP_LLM_PLANNER_)**  |                                                | Optional: Settings for a separate LLM for agent planning. Defaults to Main LLM if not set.                |                                   |
|                                     | `MCP_LLM_PLANNER_PROVIDER`                     | Planner LLM provider.                                                                                      | Main LLM Provider                 |
|                                     | `MCP_LLM_PLANNER_MODEL_NAME`                   | Planner LLM model name.                                                                                    | Main LLM Model                    |
| **Browser (MCP_BROWSER_)**          |                                                | General browser settings.                                                                                  |                                   |
|                                     | `MCP_BROWSER_HEADLESS`                         | Run browser without UI (general setting).                                                                  | `false`                           |
|                                     | `MCP_BROWSER_DISABLE_SECURITY`                 | Disable browser security features (general setting, use cautiously).                                       | `false`                           |
|                                     | `MCP_BROWSER_BINARY_PATH`                      | Path to Chrome/Chromium executable.                                                                        | -                                 |
|                                     | `MCP_BROWSER_USER_DATA_DIR`                    | Path to Chrome user data directory.                                                                        | -                                 |
|                                     | `MCP_BROWSER_WINDOW_WIDTH`                     | Browser window width (pixels).                                                                             | `1280`                            |
|                                     | `MCP_BROWSER_WINDOW_HEIGHT`                    | Browser window height (pixels).                                                                            | `1080`                            |
|                                     | `MCP_BROWSER_USE_OWN_BROWSER`                  | Connect to user's browser via CDP URL.                                                                     | `false`                           |
|                                     | `MCP_BROWSER_CDP_URL`                          | CDP URL (e.g., `http://localhost:9222`). Required if `MCP_BROWSER_USE_OWN_BROWSER=true`.                  | -                                 |
|                                     | `MCP_BROWSER_KEEP_OPEN`                        | Keep server-managed browser open between MCP calls (if `MCP_BROWSER_USE_OWN_BROWSER=false`).               | `false`                           |
|                                     | `MCP_BROWSER_TRACE_PATH`                       | Optional: Directory to save Playwright trace files. If not set, tracing to file is disabled.               | ` ` (empty, tracing disabled)     |
| **Agent Tool (MCP_AGENT_TOOL_)**    |                                                | Settings for the `run_browser_agent` tool.                                                                 |                                   |
|                                     | `MCP_AGENT_TOOL_MAX_STEPS`                     | Max steps per agent run.                                                                                   | `100`                             |
|                                     | `MCP_AGENT_TOOL_MAX_ACTIONS_PER_STEP`          | Max actions per agent step.                                                                                | `5`                               |
|                                     | `MCP_AGENT_TOOL_TOOL_CALLING_METHOD`           | Method for tool invocation ('auto', 'json_schema', 'function_calling').                                    | `auto`                            |
|                                     | `MCP_AGENT_TOOL_MAX_INPUT_TOKENS`              | Max input tokens for LLM context.                                                                          | `128000`                          |
|                                     | `MCP_AGENT_TOOL_USE_VISION`                    | Enable vision capabilities (screenshot analysis).                                                          | `true`                            |
|                                     | `MCP_AGENT_TOOL_HEADLESS`                      | Override `MCP_BROWSER_HEADLESS` for this tool (true/false/empty).                                          | ` ` (uses general)                |
|                                     | `MCP_AGENT_TOOL_DISABLE_SECURITY`              | Override `MCP_BROWSER_DISABLE_SECURITY` for this tool (true/false/empty).                                  | ` ` (uses general)                |
|                                     | `MCP_AGENT_TOOL_ENABLE_RECORDING`              | Enable Playwright video recording.                                                                         | `false`                           |
|                                     | `MCP_AGENT_TOOL_SAVE_RECORDING_PATH`           | Optional: Path to save recordings. If not set, recording to file is disabled even if `ENABLE_RECORDING=true`. | ` ` (empty, recording disabled)   |
|                                     | `MCP_AGENT_TOOL_HISTORY_PATH`                  | Optional: Directory to save agent history JSON files. If not set, history saving is disabled.              | ` ` (empty, history saving disabled) |
| **Research Tool (MCP_RESEARCH_TOOL_)** |                                             | Settings for the `run_deep_research` tool.                                                                 |                                   |
|                                     | `MCP_RESEARCH_TOOL_MAX_PARALLEL_BROWSERS`      | Max parallel browser instances for deep research.                                                          | `3`                               |
|                                     | `MCP_RESEARCH_TOOL_SAVE_DIR`                   | Optional: Base directory to save research artifacts. Task ID will be appended. If not set, operates in memory-only mode. | `None`                           |
| **Paths (MCP_PATHS_)**              |                                                | General path settings.                                                                                     |                                   |
|                                     | `MCP_PATHS_DOWNLOADS`                          | Optional: Directory for downloaded files. If not set, persistent downloads to a specific path are disabled.  | ` ` (empty, downloads disabled)  |
| **Server (MCP_SERVER_)**            |                                                | Server-specific settings.                                                                                  |                                   |
|                                     | `MCP_SERVER_LOG_FILE`                          | Path for the server log file. Empty for stdout.                                                            | ` ` (empty, logs to stdout)       |
|                                     | `MCP_SERVER_LOGGING_LEVEL`                     | Logging level (`DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`).                                           | `ERROR`                           |
|                                     | `MCP_SERVER_ANONYMIZED_TELEMETRY`              | Enable/disable anonymized telemetry (`true`/`false`).                                                      | `true`                            |
|                                     | `MCP_SERVER_MCP_CONFIG`                        | Optional: JSON string for MCP client config used by the internal controller.                               | `null`                            |

**Supported LLM Providers (`MCP_LLM_PROVIDER`):**
`openai`, `azure_openai`, `anthropic`, `google`, `mistral`, `ollama`, `deepseek`, `openrouter`, `alibaba`, `moonshot`, `unbound`

*(Refer to `.env.example` for a comprehensive list of all supported environment variables and their specific provider keys/endpoints.)*

## Connecting to Your Own Browser (CDP)

Instead of having the server launch and manage its own browser instance, you can connect it to a Chrome/Chromium browser that you launch and manage yourself.

**Steps:**

1.  **Launch Chrome/Chromium with Remote Debugging Enabled:**
    (Commands for macOS, Linux, Windows as previously listed, e.g., `google-chrome --remote-debugging-port=9222`)

2.  **Configure Environment Variables:**
    Set the following environment variables:
    ```dotenv
    MCP_BROWSER_USE_OWN_BROWSER=true
    MCP_BROWSER_CDP_URL=http://localhost:9222 # Use the same port
    # Optional: MCP_BROWSER_USER_DATA_DIR=/path/to/your/profile
    ```

3.  **Run the MCP Server or CLI:**
    Start the server (`uv run mcp-server-browser-use`) or CLI (`mcp-browser-cli ...`) as usual.

**Important Considerations:**
*   The browser launched with `--remote-debugging-port` must remain open.
*   Settings like `MCP_BROWSER_HEADLESS` and `MCP_BROWSER_KEEP_OPEN` are ignored when `MCP_BROWSER_USE_OWN_BROWSER=true`.

## Development

```bash
# Install dev dependencies and sync project deps
uv sync --dev

# Install playwright browsers
uv run playwright install

# Run MCP server with debugger (Example connecting to own browser via CDP)
# 1. Launch Chrome: google-chrome --remote-debugging-port=9222 --user-data-dir="optional/path/to/user/profile"
# 2. Run inspector command with environment variables:
npx @modelcontextprotocol/inspector@latest \
  -e MCP_LLM_GOOGLE_API_KEY=$GOOGLE_API_KEY \
  -e MCP_LLM_PROVIDER=google \
  -e MCP_LLM_MODEL_NAME=gemini-2.5-flash-preview-04-17 \
  -e MCP_BROWSER_USE_OWN_BROWSER=true \
  -e MCP_BROWSER_CDP_URL=http://localhost:9222 \
  -e MCP_RESEARCH_TOOL_SAVE_DIR=./tmp/dev_research_output \
  uv --directory . run mcp-server-browser-use

# Note: Change timeout in inspector's config panel if needed (default is 10 seconds)

# Run CLI example
# Create a .env file with your settings (including MCP_RESEARCH_TOOL_SAVE_DIR) or use environment variables
uv run mcp-browser-cli -e .env run-browser-agent "What is the title of example.com?"
uv run mcp-browser-cli -e .env run-deep-research "What is the best material for a pan for everyday use on amateur kitchen and dishwasher?"
```

## Troubleshooting

-   **Configuration Error on Startup**: If the application fails to start with an error about a missing setting, ensure all **mandatory** environment variables (like `MCP_RESEARCH_TOOL_SAVE_DIR`) are set correctly in your environment or `.env` file.
-   **Browser Conflicts**: If *not* using CDP (`MCP_BROWSER_USE_OWN_BROWSER=false`), ensure no conflicting Chrome instances are running with the same user data directory if `MCP_BROWSER_USER_DATA_DIR` is specified.
-   **CDP Connection Issues**: If using `MCP_BROWSER_USE_OWN_BROWSER=true`:
    *   Verify Chrome was launched with `--remote-debugging-port`.
    *   Ensure the port in `MCP_BROWSER_CDP_URL` matches.
    *   Check firewalls and ensure the browser is running.
-   **API Errors**: Double-check API keys (`MCP_LLM_<PROVIDER>_API_KEY` or `MCP_LLM_API_KEY`) and endpoints (e.g., `MCP_LLM_AZURE_OPENAI_ENDPOINT` for Azure).
-   **Vision Issues**: Ensure `MCP_AGENT_TOOL_USE_VISION=true` and your LLM supports vision.
-   **Dependency Problems**: Run `uv sync` and `uv run playwright install`.
-   **File/Path Issues**:
    *   If optional features like history saving, tracing, or downloads are not working, ensure the corresponding path variables (`MCP_AGENT_TOOL_HISTORY_PATH`, `MCP_BROWSER_TRACE_PATH`, `MCP_PATHS_DOWNLOADS`) are set and the application has write permissions to those locations.
    *   For deep research, ensure `MCP_RESEARCH_TOOL_SAVE_DIR` is set to a valid, writable directory.
-   **Logging**: Check the log file (`MCP_SERVER_LOG_FILE`, if set) or console output. Increase `MCP_SERVER_LOGGING_LEVEL` to `DEBUG` for more details. For CLI, use `--log-level DEBUG`.

## License

MIT - See [LICENSE](LICENSE) for details.

```

--------------------------------------------------------------------------------
/CLAUDE.md:
--------------------------------------------------------------------------------

```markdown
# Development Guidelines

This document contains critical information about working with this codebase. Follow these guidelines precisely.

## Core Development Rules

1. Package Management
   - ONLY use uv, NEVER pip
   - Installation: `uv add package`
   - Running tools: `uv run tool`
   - Upgrading: `uv add --dev package --upgrade-package package`
   - FORBIDDEN: `uv pip install`, `@latest` syntax

2. Code Quality
   - Type hints required for all code
   - Public APIs must have docstrings
   - Functions must be focused and small
   - Follow existing patterns exactly
   - Line length: 150 chars maximum

3. Testing Requirements
   - Framework: `uv run pytest`
   - Async testing: use anyio, not asyncio
   - Coverage: test edge cases and errors
   - New features require tests
   - Bug fixes require regression tests

## Python Tools

## Code Formatting

1. Ruff
   - Format: `uv run ruff format .`
   - Check: `uv run ruff check .`
   - Fix: `uv run ruff check . --fix`
   - Critical issues:
     - Line length (150 chars)
     - Import sorting (I001)
     - Unused imports
   - Line wrapping:
     - Strings: use parentheses
     - Function calls: multi-line with proper indent
     - Imports: split into multiple lines

2. Type Checking
   - Tool: `uv run pyright`
   - Requirements:
     - Explicit None checks for Optional
     - Type narrowing for strings
     - Version warnings can be ignored if checks pass

3. Pre-commit
   - Config: `.pre-commit-config.yaml`
   - Runs: on git commit
   - Tools: Prettier (YAML/JSON), Ruff (Python)
   - Ruff updates:
     - Check PyPI versions
     - Update config rev
     - Commit config first

## Error Resolution

1. CI Failures
   - Fix order:
     1. Formatting
     2. Type errors
     3. Linting
   - Type errors:
     - Get full line context
     - Check Optional types
     - Add type narrowing
     - Verify function signatures

2. Common Issues
   - Line length150     - Break strings with parentheses
     - Multi-line function calls
     - Split imports
   - Types:
     - Add None checks
     - Narrow string types
     - Match existing patterns

3. Best Practices
   - Check git status before commits
   - Run formatters before type checks
   - Keep changes minimal
   - Follow existing patterns
   - Document public APIs
   - Test thoroughly

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/agent/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/browser/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/controller/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/__init__.py:
--------------------------------------------------------------------------------

```python
from mcp_server_browser_use.server import main

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/__main__.py:
--------------------------------------------------------------------------------

```python
from mcp_server_browser_use.server import main

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/utils.py:
--------------------------------------------------------------------------------

```python
import base64
import os
import time
from pathlib import Path
from typing import Dict, Optional
import requests
import json
import uuid


def encode_image(img_path):
    if not img_path:
        return None
    with open(img_path, "rb") as fin:
        image_data = base64.b64encode(fin.read()).decode("utf-8")
    return image_data


def get_latest_files(directory: str, file_types: list = ['.webm', '.zip']) -> Dict[str, Optional[str]]:
    """Get the latest recording and trace files"""
    latest_files: Dict[str, Optional[str]] = {ext: None for ext in file_types}

    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
        return latest_files

    for file_type in file_types:
        try:
            matches = list(Path(directory).rglob(f"*{file_type}"))
            if matches:
                latest = max(matches, key=lambda p: p.stat().st_mtime)
                # Only return files that are complete (not being written)
                if time.time() - latest.stat().st_mtime > 1.0:
                    latest_files[file_type] = str(latest)
        except Exception as e:
            print(f"Error getting latest {file_type} file: {e}")

    return latest_files

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp_server_browser_use"
version = "0.1.8"
description = "MCP server for browser-use"
readme = "README.md"
requires-python = ">=3.11"
authors = [{ name = "Igor Tarasenko" }]
license = { text = "MIT" }
classifiers = [
  "Development Status :: 4 - Beta",
  "Programming Language :: Python :: 3",
  "Programming Language :: Python :: 3.11",
  "Operating System :: OS Independent",
]

dependencies = [
  "pydantic-settings>=2.0.0",
  "mcp>=1.6.0",
  "typer>=0.12.0",
  "browser-use==0.1.41",
  "pyperclip==1.9.0",
  "json-repair",
  "langchain-mistralai==0.2.4",
  "MainContentExtractor==0.0.4",
  "langchain-ibm==0.3.10",
  "langchain_mcp_adapters==0.0.9",
  "langgraph==0.3.34",
  "langchain-community",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/mcp_server_browser_use"]

[project.scripts]
mcp-server-browser-use = "mcp_server_browser_use.server:main"
mcp-browser-cli = "mcp_server_browser_use.cli:app"

[tool.pyright]
include = ["src/mcp_server_browser_use"]
venvPath = "."
venv = ".venv"

[tool.ruff.lint]
select = ["E", "F", "I"]
ignore = []

[tool.ruff]
line-length = 150
target-version = "py311"

[tool.uv]
dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/config.py:
--------------------------------------------------------------------------------

```python
PROVIDER_DISPLAY_NAMES = {
    "openai": "OpenAI",
    "azure_openai": "Azure OpenAI",
    "anthropic": "Anthropic",
    "deepseek": "DeepSeek",
    "google": "Google",
    "alibaba": "Alibaba",
    "moonshot": "MoonShot",
    "unbound": "Unbound AI",
    "ibm": "IBM"
}

# Predefined model names for common providers
model_names = {
    "anthropic": ["claude-3-5-sonnet-20241022", "claude-3-5-sonnet-20240620", "claude-3-opus-20240229"],
    "openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo", "o3-mini"],
    "deepseek": ["deepseek-chat", "deepseek-reasoner"],
    "google": ["gemini-2.0-flash", "gemini-2.0-flash-thinking-exp", "gemini-1.5-flash-latest",
               "gemini-1.5-flash-8b-latest", "gemini-2.0-flash-thinking-exp-01-21", "gemini-2.0-pro-exp-02-05",
               "gemini-2.5-pro-preview-03-25", "gemini-2.5-flash-preview-04-17"],
    "ollama": ["qwen2.5:7b", "qwen2.5:14b", "qwen2.5:32b", "qwen2.5-coder:14b", "qwen2.5-coder:32b", "llama2:7b",
               "deepseek-r1:14b", "deepseek-r1:32b"],
    "azure_openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"],
    "mistral": ["pixtral-large-latest", "mistral-large-latest", "mistral-small-latest", "ministral-8b-latest"],
    "alibaba": ["qwen-plus", "qwen-max", "qwen-vl-max", "qwen-vl-plus", "qwen-turbo", "qwen-long"],
    "moonshot": ["moonshot-v1-32k-vision-preview", "moonshot-v1-8k-vision-preview"],
    "unbound": ["gemini-2.0-flash", "gpt-4o-mini", "gpt-4o", "gpt-4.5-preview"],
    "siliconflow": [
        "deepseek-ai/DeepSeek-R1",
        "deepseek-ai/DeepSeek-V3",
        "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
        "deepseek-ai/DeepSeek-R1-Distill-Qwen-14B",
        "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B",
        "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
        "deepseek-ai/DeepSeek-V2.5",
        "deepseek-ai/deepseek-vl2",
        "Qwen/Qwen2.5-72B-Instruct-128K",
        "Qwen/Qwen2.5-72B-Instruct",
        "Qwen/Qwen2.5-32B-Instruct",
        "Qwen/Qwen2.5-14B-Instruct",
        "Qwen/Qwen2.5-7B-Instruct",
        "Qwen/Qwen2.5-Coder-32B-Instruct",
        "Qwen/Qwen2.5-Coder-7B-Instruct",
        "Qwen/Qwen2-7B-Instruct",
        "Qwen/Qwen2-1.5B-Instruct",
        "Qwen/QwQ-32B-Preview",
        "Qwen/Qwen2-VL-72B-Instruct",
        "Qwen/Qwen2.5-VL-32B-Instruct",
        "Qwen/Qwen2.5-VL-72B-Instruct",
        "TeleAI/TeleChat2",
        "THUDM/glm-4-9b-chat",
        "Vendor-A/Qwen/Qwen2.5-72B-Instruct",
        "internlm/internlm2_5-7b-chat",
        "internlm/internlm2_5-20b-chat",
        "Pro/Qwen/Qwen2.5-7B-Instruct",
        "Pro/Qwen/Qwen2-7B-Instruct",
        "Pro/Qwen/Qwen2-1.5B-Instruct",
        "Pro/THUDM/chatglm3-6b",
        "Pro/THUDM/glm-4-9b-chat",
    ],
    "ibm": ["ibm/granite-vision-3.1-2b-preview", "meta-llama/llama-4-maverick-17b-128e-instruct-fp8",
            "meta-llama/llama-3-2-90b-vision-instruct"]
}

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/browser/custom_context.py:
--------------------------------------------------------------------------------

```python
import json
import logging
import os

from browser_use.browser.browser import Browser, IN_DOCKER
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from playwright.async_api import Browser as PlaywrightBrowser
from playwright.async_api import BrowserContext as PlaywrightBrowserContext
from typing import Optional
from browser_use.browser.context import BrowserContextState

logger = logging.getLogger(__name__)


class CustomBrowserContextConfig(BrowserContextConfig):
    force_new_context: bool = False  # force to create new context


class CustomBrowserContext(BrowserContext):
    def __init__(
            self,
            browser: 'Browser',
            config: BrowserContextConfig | None = None,
            state: Optional[BrowserContextState] = None,
    ):
        super(CustomBrowserContext, self).__init__(browser=browser, config=config, state=state)

    async def _create_context(self, browser: PlaywrightBrowser):
        """Creates a new browser context with anti-detection measures and loads cookies if available."""
        if not self.config.force_new_context and self.browser.config.cdp_url and len(browser.contexts) > 0:
            context = browser.contexts[0]
        elif not self.config.force_new_context and self.browser.config.browser_binary_path and len(
                browser.contexts) > 0:
            # Connect to existing Chrome instance instead of creating new one
            context = browser.contexts[0]
        else:
            # Original code for creating new context
            context = await browser.new_context(
                no_viewport=True,
                user_agent=self.config.user_agent,
                java_script_enabled=True,
                bypass_csp=self.config.disable_security,
                ignore_https_errors=self.config.disable_security,
                record_video_dir=self.config.save_recording_path,
                record_video_size=self.config.browser_window_size.model_dump(),
                record_har_path=self.config.save_har_path,
                locale=self.config.locale,
                http_credentials=self.config.http_credentials,
                is_mobile=self.config.is_mobile,
                has_touch=self.config.has_touch,
                geolocation=self.config.geolocation,
                permissions=self.config.permissions,
                timezone_id=self.config.timezone_id,
            )

        if self.config.trace_path:
            await context.tracing.start(screenshots=True, snapshots=True, sources=True)

        # Load cookies if they exist
        if self.config.cookies_file and os.path.exists(self.config.cookies_file):
            with open(self.config.cookies_file, 'r') as f:
                try:
                    cookies = json.load(f)

                    valid_same_site_values = ['Strict', 'Lax', 'None']
                    for cookie in cookies:
                        if 'sameSite' in cookie:
                            if cookie['sameSite'] not in valid_same_site_values:
                                logger.warning(
                                    f"Fixed invalid sameSite value '{cookie['sameSite']}' to 'None' for cookie {cookie.get('name')}"
                                )
                                cookie['sameSite'] = 'None'
                    logger.info(f'🍪  Loaded {len(cookies)} cookies from {self.config.cookies_file}')
                    await context.add_cookies(cookies)

                except json.JSONDecodeError as e:
                    logger.error(f'Failed to parse cookies file: {str(e)}')

        # Expose anti-detection scripts
        await context.add_init_script(
            """
            // Webdriver property
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined
            });

            // Languages
            Object.defineProperty(navigator, 'languages', {
                get: () => ['en-US']
            });

            // Plugins
            Object.defineProperty(navigator, 'plugins', {
                get: () => [1, 2, 3, 4, 5]
            });

            // Chrome runtime
            window.chrome = { runtime: {} };

            // Permissions
            const originalQuery = window.navigator.permissions.query;
            window.navigator.permissions.query = (parameters) => (
                parameters.name === 'notifications' ?
                    Promise.resolve({ state: Notification.permission }) :
                    originalQuery(parameters)
            );
            (function () {
                const originalAttachShadow = Element.prototype.attachShadow;
                Element.prototype.attachShadow = function attachShadow(options) {
                    return originalAttachShadow.call(this, { ...options, mode: "open" });
                };
            })();
            """
        )

        return context

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/browser/custom_browser.py:
--------------------------------------------------------------------------------

```python
import asyncio
import gc
import pdb

from playwright.async_api import Browser as PlaywrightBrowser
from playwright.async_api import (
    BrowserContext as PlaywrightBrowserContext,
)
from playwright.async_api import (
    Playwright,
    async_playwright,
)
from browser_use.browser.browser import Browser, IN_DOCKER
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from playwright.async_api import BrowserContext as PlaywrightBrowserContext
import logging

from browser_use.browser.chrome import (
    CHROME_ARGS,
    CHROME_DETERMINISTIC_RENDERING_ARGS,
    CHROME_DISABLE_SECURITY_ARGS,
    CHROME_DOCKER_ARGS,
    CHROME_HEADLESS_ARGS,
)
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from browser_use.browser.utils.screen_resolution import get_screen_resolution, get_window_adjustments
from browser_use.utils import time_execution_async
import socket

from .custom_context import CustomBrowserContext, CustomBrowserContextConfig

logger = logging.getLogger(__name__)


class CustomBrowser(Browser):

    async def new_context(self, config: CustomBrowserContextConfig | None = None) -> CustomBrowserContext:
        """Create a browser context"""
        browser_config = self.config.model_dump() if self.config else {}
        context_config = config.model_dump() if config else {}
        merged_config = {**browser_config, **context_config}
        return CustomBrowserContext(config=CustomBrowserContextConfig(**merged_config), browser=self)

    async def _setup_builtin_browser(self, playwright: Playwright) -> PlaywrightBrowser:
        """Sets up and returns a Playwright Browser instance with anti-detection measures."""
        assert self.config.browser_binary_path is None, 'browser_binary_path should be None if trying to use the builtin browsers'

        if self.config.headless:
            screen_size = {'width': 1920, 'height': 1080}
            offset_x, offset_y = 0, 0
        else:
            screen_size = get_screen_resolution()
            offset_x, offset_y = get_window_adjustments()

        chrome_args = {
            *CHROME_ARGS,
            *(CHROME_DOCKER_ARGS if IN_DOCKER else []),
            *(CHROME_HEADLESS_ARGS if self.config.headless else []),
            *(CHROME_DISABLE_SECURITY_ARGS if self.config.disable_security else []),
            *(CHROME_DETERMINISTIC_RENDERING_ARGS if self.config.deterministic_rendering else []),
            f'--window-position={offset_x},{offset_y}',
            *self.config.extra_browser_args,
        }
        contain_window_size = False
        for arg in self.config.extra_browser_args:
            if "--window-size" in arg:
                contain_window_size = True
                break
        if not contain_window_size:
            chrome_args.add(f'--window-size={screen_size["width"]},{screen_size["height"]}')

        # check if port 9222 is already taken, if so remove the remote-debugging-port arg to prevent conflicts
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            if s.connect_ex(('localhost', 9222)) == 0:
                chrome_args.remove('--remote-debugging-port=9222')

        browser_class = getattr(playwright, self.config.browser_class)
        args = {
            'chromium': list(chrome_args),
            'firefox': [
                *{
                    '-no-remote',
                    *self.config.extra_browser_args,
                }
            ],
            'webkit': [
                *{
                    '--no-startup-window',
                    *self.config.extra_browser_args,
                }
            ],
        }

        browser = await browser_class.launch(
            headless=self.config.headless,
            args=args[self.config.browser_class],
            proxy=self.config.proxy.model_dump() if self.config.proxy else None,
            handle_sigterm=False,
            handle_sigint=False,
        )
        return browser

    async def _close_without_httpxclients(self):
        if self.config.keep_alive:
            return

        try:
            if self.playwright_browser:
                await self.playwright_browser.close()
                del self.playwright_browser
            if self.playwright:
                await self.playwright.stop()
                del self.playwright
            if chrome_proc := getattr(self, '_chrome_subprocess', None):
                try:
                    # always kill all children processes, otherwise chrome leaves a bunch of zombie processes
                    for proc in chrome_proc.children(recursive=True):
                        proc.kill()
                    chrome_proc.kill()
                except Exception as e:
                    logger.debug(f'Failed to terminate chrome subprocess: {e}')

        except Exception as e:
            logger.debug(f'Failed to close browser properly: {e}')

        finally:
            self.playwright_browser = None
            self.playwright = None
            self._chrome_subprocess = None
            gc.collect()

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/agent/browser_use/browser_use_agent.py:
--------------------------------------------------------------------------------

```python
from __future__ import annotations

import asyncio
import gc
import inspect
import json
import logging
import os
import re
import time
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, Generic, List, Optional, TypeVar, Union

from dotenv import load_dotenv
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
    BaseMessage,
    HumanMessage,
    SystemMessage,
)

# from lmnr.sdk.decorators import observe
from pydantic import BaseModel, ValidationError

from browser_use.agent.gif import create_history_gif
from browser_use.agent.memory.service import Memory, MemorySettings
from browser_use.agent.message_manager.service import MessageManager, MessageManagerSettings
from browser_use.agent.message_manager.utils import convert_input_messages, extract_json_from_model_output, save_conversation
from browser_use.agent.prompts import AgentMessagePrompt, PlannerPrompt, SystemPrompt
from browser_use.agent.views import (
    REQUIRED_LLM_API_ENV_VARS,
    ActionResult,
    AgentError,
    AgentHistory,
    AgentHistoryList,
    AgentOutput,
    AgentSettings,
    AgentState,
    AgentStepInfo,
    StepMetadata,
    ToolCallingMethod,
)
from browser_use.browser.browser import Browser
from browser_use.browser.context import BrowserContext
from browser_use.browser.views import BrowserState, BrowserStateHistory
from browser_use.controller.registry.views import ActionModel
from browser_use.controller.service import Controller
from browser_use.dom.history_tree_processor.service import (
    DOMHistoryElement,
    HistoryTreeProcessor,
)
from browser_use.exceptions import LLMException
from browser_use.telemetry.service import ProductTelemetry
from browser_use.telemetry.views import (
    AgentEndTelemetryEvent,
    AgentRunTelemetryEvent,
    AgentStepTelemetryEvent,
)
from browser_use.utils import check_env_variables, time_execution_async, time_execution_sync
from browser_use.agent.service import Agent, AgentHookFunc

load_dotenv()
logger = logging.getLogger(__name__)

SKIP_LLM_API_KEY_VERIFICATION = os.environ.get('SKIP_LLM_API_KEY_VERIFICATION', 'false').lower()[0] in 'ty1'


class BrowserUseAgent(Agent):
    @time_execution_async('--run (agent)')
    async def run(
            self, max_steps: int = 100, on_step_start: AgentHookFunc | None = None,
            on_step_end: AgentHookFunc | None = None
    ) -> AgentHistoryList:
        """Execute the task with maximum number of steps"""

        loop = asyncio.get_event_loop()

        # Set up the Ctrl+C signal handler with callbacks specific to this agent
        from browser_use.utils import SignalHandler

        signal_handler = SignalHandler(
            loop=loop,
            pause_callback=self.pause,
            resume_callback=self.resume,
            custom_exit_callback=None,  # No special cleanup needed on forced exit
            exit_on_second_int=True,
        )
        signal_handler.register()

        # Wait for verification task to complete if it exists
        if hasattr(self, '_verification_task') and self._verification_task and not self._verification_task.done():
            try:
                await self._verification_task
            except Exception:
                # Error already logged in the task
                pass

        try:
            self._log_agent_run()

            # Execute initial actions if provided
            if self.initial_actions:
                result = await self.multi_act(self.initial_actions, check_for_new_elements=False)
                self.state.last_result = result

            for step in range(max_steps):
                # Check if waiting for user input after Ctrl+C
                while self.state.paused:
                    await asyncio.sleep(0.5)
                    if self.state.stopped:
                        break

                # Check if we should stop due to too many failures
                if self.state.consecutive_failures >= self.settings.max_failures:
                    logger.error(f'❌ Stopping due to {self.settings.max_failures} consecutive failures')
                    break

                # Check control flags before each step
                if self.state.stopped:
                    logger.info('Agent stopped')
                    break

                while self.state.paused:
                    await asyncio.sleep(0.2)  # Small delay to prevent CPU spinning
                    if self.state.stopped:  # Allow stopping while paused
                        break

                if on_step_start is not None:
                    await on_step_start(self)

                step_info = AgentStepInfo(step_number=step, max_steps=max_steps)
                await self.step(step_info)

                if on_step_end is not None:
                    await on_step_end(self)

                if self.state.history.is_done():
                    if self.settings.validate_output and step < max_steps - 1:
                        if not await self._validate_output():
                            continue

                    await self.log_completion()
                    break
            else:
                logger.info('❌ Failed to complete task in maximum steps')

            return self.state.history

        except KeyboardInterrupt:
            # Already handled by our signal handler, but catch any direct KeyboardInterrupt as well
            logger.info('Got KeyboardInterrupt during execution, returning current history')
            return self.state.history

        finally:
            # Unregister signal handlers before cleanup
            signal_handler.unregister()

            self.telemetry.capture(
                AgentEndTelemetryEvent(
                    agent_id=self.state.agent_id,
                    is_done=self.state.history.is_done(),
                    success=self.state.history.is_successful(),
                    steps=self.state.n_steps,
                    max_steps_reached=self.state.n_steps >= max_steps,
                    errors=self.state.history.errors(),
                    total_input_tokens=self.state.history.total_input_tokens(),
                    total_duration_seconds=self.state.history.total_duration_seconds(),
                )
            )

            await self.close()

            if self.settings.generate_gif:
                output_path: str = 'agent_history.gif'
                if isinstance(self.settings.generate_gif, str):
                    output_path = self.settings.generate_gif

                create_history_gif(task=self.task, history=self.state.history, output_path=output_path)
```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/controller/custom_controller.py:
--------------------------------------------------------------------------------

```python
import pdb

import pyperclip
from typing import Optional, Type, Callable, Dict, Any, Union, Awaitable, TypeVar
from pydantic import BaseModel
from browser_use.agent.views import ActionResult
from browser_use.browser.context import BrowserContext
from browser_use.controller.service import Controller, DoneAction
from browser_use.controller.registry.service import Registry, RegisteredAction
from main_content_extractor import MainContentExtractor
from browser_use.controller.views import (
    ClickElementAction,
    DoneAction,
    ExtractPageContentAction,
    GoToUrlAction,
    InputTextAction,
    OpenTabAction,
    ScrollAction,
    SearchGoogleAction,
    SendKeysAction,
    SwitchTabAction,
)
import logging
import inspect
import asyncio
import os
from langchain_core.language_models.chat_models import BaseChatModel
from browser_use.agent.views import ActionModel, ActionResult

from ..utils.mcp_client import create_tool_param_model, setup_mcp_client_and_tools

from browser_use.utils import time_execution_sync

logger = logging.getLogger(__name__)

Context = TypeVar('Context')


class CustomController(Controller):
    def __init__(self, exclude_actions: list[str] = [],
                 output_model: Optional[Type[BaseModel]] = None,
                 ask_assistant_callback: Optional[Union[Callable[[str, BrowserContext], Dict[str, Any]], Callable[
                     [str, BrowserContext], Awaitable[Dict[str, Any]]]]] = None,
                 ):
        super().__init__(exclude_actions=exclude_actions, output_model=output_model)
        self._register_custom_actions()
        self.ask_assistant_callback = ask_assistant_callback
        self.mcp_client = None
        self.mcp_server_config = None

    def _register_custom_actions(self):
        """Register all custom browser actions"""

        @self.registry.action(
            "When executing tasks, prioritize autonomous completion. However, if you encounter a definitive blocker "
            "that prevents you from proceeding independently – such as needing credentials you don't possess, "
            "requiring subjective human judgment, needing a physical action performed, encountering complex CAPTCHAs, "
            "or facing limitations in your capabilities – you must request human assistance."
        )
        async def ask_for_assistant(query: str, browser: BrowserContext):
            if self.ask_assistant_callback:
                if inspect.iscoroutinefunction(self.ask_assistant_callback):
                    user_response = await self.ask_assistant_callback(query, browser)
                else:
                    user_response = self.ask_assistant_callback(query, browser)
                msg = f"AI ask: {query}. User response: {user_response['response']}"
                logger.info(msg)
                return ActionResult(extracted_content=msg, include_in_memory=True)
            else:
                return ActionResult(extracted_content="Human cannot help you. Please try another way.",
                                    include_in_memory=True)

        @self.registry.action(
            'Upload file to interactive element with file path ',
        )
        async def upload_file(index: int, path: str, browser: BrowserContext, available_file_paths: list[str]):
            if path not in available_file_paths:
                return ActionResult(error=f'File path {path} is not available')

            if not os.path.exists(path):
                return ActionResult(error=f'File {path} does not exist')

            dom_el = await browser.get_dom_element_by_index(index)

            file_upload_dom_el = dom_el.get_file_upload_element()

            if file_upload_dom_el is None:
                msg = f'No file upload element found at index {index}'
                logger.info(msg)
                return ActionResult(error=msg)

            file_upload_el = await browser.get_locate_element(file_upload_dom_el)

            if file_upload_el is None:
                msg = f'No file upload element found at index {index}'
                logger.info(msg)
                return ActionResult(error=msg)

            try:
                await file_upload_el.set_input_files(path)
                msg = f'Successfully uploaded file to index {index}'
                logger.info(msg)
                return ActionResult(extracted_content=msg, include_in_memory=True)
            except Exception as e:
                msg = f'Failed to upload file to index {index}: {str(e)}'
                logger.info(msg)
                return ActionResult(error=msg)

    @time_execution_sync('--act')
    async def act(
            self,
            action: ActionModel,
            browser_context: Optional[BrowserContext] = None,
            #
            page_extraction_llm: Optional[BaseChatModel] = None,
            sensitive_data: Optional[Dict[str, str]] = None,
            available_file_paths: Optional[list[str]] = None,
            #
            context: Context | None = None,
    ) -> ActionResult:
        """Execute an action"""

        try:
            for action_name, params in action.model_dump(exclude_unset=True).items():
                if params is not None:
                    if action_name.startswith("mcp"):
                        # this is a mcp tool
                        logger.debug(f"Invoke MCP tool: {action_name}")
                        mcp_tool = self.registry.registry.actions.get(action_name).function
                        result = await mcp_tool.ainvoke(params)
                    else:
                        result = await self.registry.execute_action(
                            action_name,
                            params,
                            browser=browser_context,
                            page_extraction_llm=page_extraction_llm,
                            sensitive_data=sensitive_data,
                            available_file_paths=available_file_paths,
                            context=context,
                        )

                    if isinstance(result, str):
                        return ActionResult(extracted_content=result)
                    elif isinstance(result, ActionResult):
                        return result
                    elif result is None:
                        return ActionResult()
                    else:
                        raise ValueError(f'Invalid action result type: {type(result)} of {result}')
            return ActionResult()
        except Exception as e:
            raise e

    async def setup_mcp_client(self, mcp_server_config: Optional[Dict[str, Any]] = None):
        self.mcp_server_config = mcp_server_config
        if self.mcp_server_config:
            self.mcp_client = await setup_mcp_client_and_tools(self.mcp_server_config)
            self.register_mcp_tools()

    def register_mcp_tools(self):
        """
        Register the MCP tools used by this controller.
        """
        if self.mcp_client:
            for server_name in self.mcp_client.server_name_to_tools:
                for tool in self.mcp_client.server_name_to_tools[server_name]:
                    tool_name = f"mcp.{server_name}.{tool.name}"
                    self.registry.registry.actions[tool_name] = RegisteredAction(
                        name=tool_name,
                        description=tool.description,
                        function=tool,
                        param_model=create_tool_param_model(tool),
                    )
                    logger.info(f"Add mcp tool: {tool_name}")

    async def close_mcp_client(self):
        if self.mcp_client:
            await self.mcp_client.__aexit__(None, None, None)

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/mcp_client.py:
--------------------------------------------------------------------------------

```python
import os
import asyncio
import base64
import pdb
from typing import List, Tuple, Optional
from langchain_core.tools import BaseTool
from langchain_mcp_adapters.client import MultiServerMCPClient
import base64
import json
import logging
from typing import Optional, Dict, Any, Type
from langchain_core.tools import BaseTool
from pydantic.v1 import BaseModel, Field
from langchain_core.runnables import RunnableConfig
from pydantic import BaseModel, Field, create_model
from typing import Type, Dict, Any, Optional, get_type_hints, List, Union, Annotated, Set
from pydantic import BaseModel, ConfigDict, create_model, Field
from langchain.tools import BaseTool
import inspect
from datetime import datetime, date, time
import uuid
from enum import Enum
import inspect
from browser_use.controller.registry.views import ActionModel
from typing import Type, Dict, Any, Optional, get_type_hints

logger = logging.getLogger(__name__)


async def setup_mcp_client_and_tools(mcp_server_config: Dict[str, Any]) -> Optional[MultiServerMCPClient]:
    """
    Initializes the MultiServerMCPClient, connects to servers, fetches tools,
    filters them, and returns a flat list of usable tools and the client instance.

    Returns:
        A tuple containing:
        - list[BaseTool]: The filtered list of usable LangChain tools.
        - MultiServerMCPClient | None: The initialized and started client instance, or None on failure.
    """

    logger.info("Initializing MultiServerMCPClient...")

    if not mcp_server_config:
        logger.error("No MCP server configuration provided.")
        return None

    try:
        if "mcpServers" in mcp_server_config:
            mcp_server_config = mcp_server_config["mcpServers"]
        client = MultiServerMCPClient(mcp_server_config)
        await client.__aenter__()
        return client

    except Exception as e:
        logger.error(f"Failed to setup MCP client or fetch tools: {e}", exc_info=True)
        return None


def create_tool_param_model(tool: BaseTool) -> Type[BaseModel]:
    """Creates a Pydantic model from a LangChain tool's schema"""

    # Get tool schema information
    json_schema = tool.args_schema
    tool_name = tool.name

    # If the tool already has a schema defined, convert it to a new param_model
    if json_schema is not None:

        # Create new parameter model
        params = {}

        # Process properties if they exist
        if 'properties' in json_schema:
            # Find required fields
            required_fields: Set[str] = set(json_schema.get('required', []))

            for prop_name, prop_details in json_schema['properties'].items():
                field_type = resolve_type(prop_details, f"{tool_name}_{prop_name}")

                # Check if parameter is required
                is_required = prop_name in required_fields

                # Get default value and description
                default_value = prop_details.get('default', ... if is_required else None)
                description = prop_details.get('description', '')

                # Add field constraints
                field_kwargs = {'default': default_value}
                if description:
                    field_kwargs['description'] = description

                # Add additional constraints if present
                if 'minimum' in prop_details:
                    field_kwargs['ge'] = prop_details['minimum']
                if 'maximum' in prop_details:
                    field_kwargs['le'] = prop_details['maximum']
                if 'minLength' in prop_details:
                    field_kwargs['min_length'] = prop_details['minLength']
                if 'maxLength' in prop_details:
                    field_kwargs['max_length'] = prop_details['maxLength']
                if 'pattern' in prop_details:
                    field_kwargs['pattern'] = prop_details['pattern']

                # Add to parameters dictionary
                params[prop_name] = (field_type, Field(**field_kwargs))

        return create_model(
            f'{tool_name}_parameters',
            __base__=ActionModel,
            **params,  # type: ignore
        )

    # If no schema is defined, extract parameters from the _run method
    run_method = tool._run
    sig = inspect.signature(run_method)

    # Get type hints for better type information
    try:
        type_hints = get_type_hints(run_method)
    except Exception:
        type_hints = {}

    params = {}
    for name, param in sig.parameters.items():
        # Skip 'self' parameter and any other parameters you want to exclude
        if name == 'self':
            continue

        # Get annotation from type hints if available, otherwise from signature
        annotation = type_hints.get(name, param.annotation)
        if annotation == inspect.Parameter.empty:
            annotation = Any

        # Use default value if available, otherwise make it required
        if param.default != param.empty:
            params[name] = (annotation, param.default)
        else:
            params[name] = (annotation, ...)

    return create_model(
        f'{tool_name}_parameters',
        __base__=ActionModel,
        **params,  # type: ignore
    )


def resolve_type(prop_details: Dict[str, Any], prefix: str = "") -> Any:
    """Recursively resolves JSON schema type to Python/Pydantic type"""

    # Handle reference types
    if '$ref' in prop_details:
        # In a real application, reference resolution would be needed
        return Any

    # Basic type mapping
    type_mapping = {
        'string': str,
        'integer': int,
        'number': float,
        'boolean': bool,
        'array': List,
        'object': Dict,
        'null': type(None),
    }

    # Handle formatted strings
    if prop_details.get('type') == 'string' and 'format' in prop_details:
        format_mapping = {
            'date-time': datetime,
            'date': date,
            'time': time,
            'email': str,
            'uri': str,
            'url': str,
            'uuid': uuid.UUID,
            'binary': bytes,
        }
        return format_mapping.get(prop_details['format'], str)

    # Handle enum types
    if 'enum' in prop_details:
        enum_values = prop_details['enum']
        # Create dynamic enum class with safe names
        enum_dict = {}
        for i, v in enumerate(enum_values):
            # Ensure enum names are valid Python identifiers
            if isinstance(v, str):
                key = v.upper().replace(' ', '_').replace('-', '_')
                if not key.isidentifier():
                    key = f"VALUE_{i}"
            else:
                key = f"VALUE_{i}"
            enum_dict[key] = v

        # Only create enum if we have values
        if enum_dict:
            return Enum(f"{prefix}_Enum", enum_dict)
        return str  # Fallback

    # Handle array types
    if prop_details.get('type') == 'array' and 'items' in prop_details:
        item_type = resolve_type(prop_details['items'], f"{prefix}_item")
        return List[item_type]  # type: ignore

    # Handle object types with properties
    if prop_details.get('type') == 'object' and 'properties' in prop_details:
        nested_params = {}
        for nested_name, nested_details in prop_details['properties'].items():
            nested_type = resolve_type(nested_details, f"{prefix}_{nested_name}")
            # Get required field info
            required_fields = prop_details.get('required', [])
            is_required = nested_name in required_fields
            default_value = nested_details.get('default', ... if is_required else None)
            description = nested_details.get('description', '')

            field_kwargs = {'default': default_value}
            if description:
                field_kwargs['description'] = description

            nested_params[nested_name] = (nested_type, Field(**field_kwargs))

        # Create nested model
        nested_model = create_model(f"{prefix}_Model", **nested_params)
        return nested_model

    # Handle union types (oneOf, anyOf)
    if 'oneOf' in prop_details or 'anyOf' in prop_details:
        union_schema = prop_details.get('oneOf') or prop_details.get('anyOf')
        union_types = []
        for i, t in enumerate(union_schema):
            union_types.append(resolve_type(t, f"{prefix}_{i}"))

        if union_types:
            return Union.__getitem__(tuple(union_types))  # type: ignore
        return Any

    # Handle allOf (intersection types)
    if 'allOf' in prop_details:
        nested_params = {}
        for i, schema_part in enumerate(prop_details['allOf']):
            if 'properties' in schema_part:
                for nested_name, nested_details in schema_part['properties'].items():
                    nested_type = resolve_type(nested_details, f"{prefix}_allOf_{i}_{nested_name}")
                    # Check if required
                    required_fields = schema_part.get('required', [])
                    is_required = nested_name in required_fields
                    nested_params[nested_name] = (nested_type, ... if is_required else None)

        # Create composite model
        if nested_params:
            composite_model = create_model(f"{prefix}_CompositeModel", **nested_params)
            return composite_model
        return Dict

    # Default to basic types
    schema_type = prop_details.get('type', 'string')
    if isinstance(schema_type, list):
        # Handle multiple types (e.g., ["string", "null"])
        non_null_types = [t for t in schema_type if t != 'null']
        if non_null_types:
            primary_type = type_mapping.get(non_null_types[0], Any)
            if 'null' in schema_type:
                return Optional[primary_type]  # type: ignore
            return primary_type
        return Any

    return type_mapping.get(schema_type, Any)

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/utils/llm_provider.py:
--------------------------------------------------------------------------------

```python
from openai import OpenAI
import pdb
from langchain_openai import ChatOpenAI
from langchain_core.globals import get_llm_cache
from langchain_core.language_models.base import (
    BaseLanguageModel,
    LangSmithParams,
    LanguageModelInput,
)
import os
from langchain_core.load import dumpd, dumps
from langchain_core.messages import (
    AIMessage,
    SystemMessage,
    AnyMessage,
    BaseMessage,
    BaseMessageChunk,
    HumanMessage,
    convert_to_messages,
    message_chunk_to_message,
)
from langchain_core.outputs import (
    ChatGeneration,
    ChatGenerationChunk,
    ChatResult,
    LLMResult,
    RunInfo,
)
from langchain_ollama import ChatOllama
from langchain_core.output_parsers.base import OutputParserLike
from langchain_core.runnables import Runnable, RunnableConfig
from langchain_core.tools import BaseTool

from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Literal,
    Optional,
    Union,
    cast, List,
)
from langchain_anthropic import ChatAnthropic
from langchain_mistralai import ChatMistralAI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_ollama import ChatOllama
from langchain_openai import AzureChatOpenAI, ChatOpenAI
from langchain_ibm import ChatWatsonx
from langchain_aws import ChatBedrock
from pydantic import SecretStr

from ..utils import config


class DeepSeekR1ChatOpenAI(ChatOpenAI):

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.client = OpenAI(
            base_url=kwargs.get("base_url"),
            api_key=kwargs.get("api_key")
        )

    async def ainvoke(
            self,
            input: LanguageModelInput,
            config: Optional[RunnableConfig] = None,
            *,
            stop: Optional[list[str]] = None,
            **kwargs: Any,
    ) -> AIMessage:
        message_history = []
        for input_ in input:
            if isinstance(input_, SystemMessage):
                message_history.append({"role": "system", "content": input_.content})
            elif isinstance(input_, AIMessage):
                message_history.append({"role": "assistant", "content": input_.content})
            else:
                message_history.append({"role": "user", "content": input_.content})

        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=message_history
        )

        reasoning_content = response.choices[0].message.reasoning_content
        content = response.choices[0].message.content
        return AIMessage(content=content, reasoning_content=reasoning_content)

    def invoke(
            self,
            input: LanguageModelInput,
            config: Optional[RunnableConfig] = None,
            *,
            stop: Optional[list[str]] = None,
            **kwargs: Any,
    ) -> AIMessage:
        message_history = []
        for input_ in input:
            if isinstance(input_, SystemMessage):
                message_history.append({"role": "system", "content": input_.content})
            elif isinstance(input_, AIMessage):
                message_history.append({"role": "assistant", "content": input_.content})
            else:
                message_history.append({"role": "user", "content": input_.content})

        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=message_history
        )

        reasoning_content = response.choices[0].message.reasoning_content
        content = response.choices[0].message.content
        return AIMessage(content=content, reasoning_content=reasoning_content)


class DeepSeekR1ChatOllama(ChatOllama):

    async def ainvoke(
            self,
            input: LanguageModelInput,
            config: Optional[RunnableConfig] = None,
            *,
            stop: Optional[list[str]] = None,
            **kwargs: Any,
    ) -> AIMessage:
        org_ai_message = await super().ainvoke(input=input)
        org_content = org_ai_message.content
        reasoning_content = org_content.split("</think>")[0].replace("<think>", "")
        content = org_content.split("</think>")[1]
        if "**JSON Response:**" in content:
            content = content.split("**JSON Response:**")[-1]
        return AIMessage(content=content, reasoning_content=reasoning_content)

    def invoke(
            self,
            input: LanguageModelInput,
            config: Optional[RunnableConfig] = None,
            *,
            stop: Optional[list[str]] = None,
            **kwargs: Any,
    ) -> AIMessage:
        org_ai_message = super().invoke(input=input)
        org_content = org_ai_message.content
        reasoning_content = org_content.split("</think>")[0].replace("<think>", "")
        content = org_content.split("</think>")[1]
        if "**JSON Response:**" in content:
            content = content.split("**JSON Response:**")[-1]
        return AIMessage(content=content, reasoning_content=reasoning_content)


def get_llm_model(provider: str, **kwargs):
    """
    Get LLM model
    :param provider: LLM provider
    :param kwargs:
    :return:
    """
    if provider not in ["ollama", "bedrock"]:
        env_var = f"{provider.upper()}_API_KEY"
        api_key = kwargs.get("api_key", "") or os.getenv(env_var, "")
        if not api_key:
            provider_display = config.PROVIDER_DISPLAY_NAMES.get(provider, provider.upper())
            error_msg = f"💥 {provider_display} API key not found! 🔑 Please set the `{env_var}` environment variable or provide it in the UI."
            raise ValueError(error_msg)
        kwargs["api_key"] = api_key

    if provider == "anthropic":
        if not kwargs.get("base_url", ""):
            base_url = "https://api.anthropic.com"
        else:
            base_url = kwargs.get("base_url")

        return ChatAnthropic(
            model=kwargs.get("model_name", "claude-3-5-sonnet-20241022"),
            temperature=kwargs.get("temperature", 0.0),
            base_url=base_url,
            api_key=api_key,
        )
    elif provider == 'mistral':
        if not kwargs.get("base_url", ""):
            base_url = os.getenv("MISTRAL_ENDPOINT", "https://api.mistral.ai/v1")
        else:
            base_url = kwargs.get("base_url")
        if not kwargs.get("api_key", ""):
            api_key = os.getenv("MISTRAL_API_KEY", "")
        else:
            api_key = kwargs.get("api_key")

        return ChatMistralAI(
            model=kwargs.get("model_name", "mistral-large-latest"),
            temperature=kwargs.get("temperature", 0.0),
            base_url=base_url,
            api_key=api_key,
        )
    elif provider == "openai":
        if not kwargs.get("base_url", ""):
            base_url = os.getenv("OPENAI_ENDPOINT", "https://api.openai.com/v1")
        else:
            base_url = kwargs.get("base_url")

        return ChatOpenAI(
            model=kwargs.get("model_name", "gpt-4o"),
            temperature=kwargs.get("temperature", 0.0),
            base_url=base_url,
            api_key=api_key,
        )
    elif provider == "deepseek":
        if not kwargs.get("base_url", ""):
            base_url = os.getenv("DEEPSEEK_ENDPOINT", "")
        else:
            base_url = kwargs.get("base_url")

        if kwargs.get("model_name", "deepseek-chat") == "deepseek-reasoner":
            return DeepSeekR1ChatOpenAI(
                model=kwargs.get("model_name", "deepseek-reasoner"),
                temperature=kwargs.get("temperature", 0.0),
                base_url=base_url,
                api_key=api_key,
            )
        else:
            return ChatOpenAI(
                model=kwargs.get("model_name", "deepseek-chat"),
                temperature=kwargs.get("temperature", 0.0),
                base_url=base_url,
                api_key=api_key,
            )
    elif provider == "google":
        return ChatGoogleGenerativeAI(
            model=kwargs.get("model_name", "gemini-2.0-flash-exp"),
            temperature=kwargs.get("temperature", 0.0),
            api_key=api_key,
        )
    elif provider == "ollama":
        if not kwargs.get("base_url", ""):
            base_url = os.getenv("OLLAMA_ENDPOINT", "http://localhost:11434")
        else:
            base_url = kwargs.get("base_url")

        if "deepseek-r1" in kwargs.get("model_name", "qwen2.5:7b"):
            return DeepSeekR1ChatOllama(
                model=kwargs.get("model_name", "deepseek-r1:14b"),
                temperature=kwargs.get("temperature", 0.0),
                num_ctx=kwargs.get("num_ctx", 32000),
                base_url=base_url,
            )
        else:
            return ChatOllama(
                model=kwargs.get("model_name", "qwen2.5:7b"),
                temperature=kwargs.get("temperature", 0.0),
                num_ctx=kwargs.get("num_ctx", 32000),
                num_predict=kwargs.get("num_predict", 1024),
                base_url=base_url,
            )
    elif provider == "azure_openai":
        if not kwargs.get("base_url", ""):
            base_url = os.getenv("AZURE_OPENAI_ENDPOINT", "")
        else:
            base_url = kwargs.get("base_url")
        api_version = kwargs.get("api_version", "") or os.getenv("AZURE_OPENAI_API_VERSION", "2025-01-01-preview")
        return AzureChatOpenAI(
            model=kwargs.get("model_name", "gpt-4o"),
            temperature=kwargs.get("temperature", 0.0),
            api_version=api_version,
            azure_endpoint=base_url,
            api_key=api_key,
        )
    elif provider == "alibaba":
        if not kwargs.get("base_url", ""):
            base_url = os.getenv("ALIBABA_ENDPOINT", "https://dashscope.aliyuncs.com/compatible-mode/v1")
        else:
            base_url = kwargs.get("base_url")

        return ChatOpenAI(
            model=kwargs.get("model_name", "qwen-plus"),
            temperature=kwargs.get("temperature", 0.0),
            base_url=base_url,
            api_key=api_key,
        )
    elif provider == "ibm":
        parameters = {
            "temperature": kwargs.get("temperature", 0.0),
            "max_tokens": kwargs.get("num_ctx", 32000)
        }
        if not kwargs.get("base_url", ""):
            base_url = os.getenv("IBM_ENDPOINT", "https://us-south.ml.cloud.ibm.com")
        else:
            base_url = kwargs.get("base_url")

        return ChatWatsonx(
            model_id=kwargs.get("model_name", "ibm/granite-vision-3.1-2b-preview"),
            url=base_url,
            project_id=os.getenv("IBM_PROJECT_ID"),
            apikey=os.getenv("IBM_API_KEY"),
            params=parameters
        )
    elif provider == "moonshot":
        return ChatOpenAI(
            model=kwargs.get("model_name", "moonshot-v1-32k-vision-preview"),
            temperature=kwargs.get("temperature", 0.0),
            base_url=os.getenv("MOONSHOT_ENDPOINT"),
            api_key=os.getenv("MOONSHOT_API_KEY"),
        )
    elif provider == "unbound":
        return ChatOpenAI(
            model=kwargs.get("model_name", "gpt-4o-mini"),
            temperature=kwargs.get("temperature", 0.0),
            base_url=os.getenv("UNBOUND_ENDPOINT", "https://api.getunbound.ai"),
            api_key=api_key,
        )
    elif provider == "siliconflow":
        if not kwargs.get("api_key", ""):
            api_key = os.getenv("SiliconFLOW_API_KEY", "")
        else:
            api_key = kwargs.get("api_key")
        if not kwargs.get("base_url", ""):
            base_url = os.getenv("SiliconFLOW_ENDPOINT", "")
        else:
            base_url = kwargs.get("base_url")
        return ChatOpenAI(
            api_key=api_key,
            base_url=base_url,
            model_name=kwargs.get("model_name", "Qwen/QwQ-32B"),
            temperature=kwargs.get("temperature", 0.0),
        )
    else:
        raise ValueError(f"Unsupported provider: {provider}")

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/config.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List, Optional, Union

from pydantic import Field, SecretStr, field_validator, ValidationInfo
from pydantic_settings import BaseSettings, SettingsConfigDict


class LLMSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="MCP_LLM_")

    provider: str = Field(default="google", env="PROVIDER")
    model_name: str = Field(default="gemini-2.5-flash-preview-04-17", env="MODEL_NAME")
    temperature: float = Field(default=0.0, env="TEMPERATURE")
    base_url: Optional[str] = Field(default=None, env="BASE_URL")
    api_key: Optional[SecretStr] = Field(default=None, env="API_KEY") # Generic API key

    # Provider-specific API keys
    openai_api_key: Optional[SecretStr] = Field(default=None, env="OPENAI_API_KEY")
    anthropic_api_key: Optional[SecretStr] = Field(default=None, env="ANTHROPIC_API_KEY")
    google_api_key: Optional[SecretStr] = Field(default=None, env="GOOGLE_API_KEY")
    azure_openai_api_key: Optional[SecretStr] = Field(default=None, env="AZURE_OPENAI_API_KEY")
    deepseek_api_key: Optional[SecretStr] = Field(default=None, env="DEEPSEEK_API_KEY")
    mistral_api_key: Optional[SecretStr] = Field(default=None, env="MISTRAL_API_KEY")
    openrouter_api_key: Optional[SecretStr] = Field(default=None, env="OPENROUTER_API_KEY")
    alibaba_api_key: Optional[SecretStr] = Field(default=None, env="ALIBABA_API_KEY")
    moonshot_api_key: Optional[SecretStr] = Field(default=None, env="MOONSHOT_API_KEY")
    unbound_api_key: Optional[SecretStr] = Field(default=None, env="UNBOUND_API_KEY")

    # Provider-specific endpoints
    openai_endpoint: Optional[str] = Field(default=None, env="OPENAI_ENDPOINT")
    anthropic_endpoint: Optional[str] = Field(default=None, env="ANTHROPIC_ENDPOINT")
    azure_openai_endpoint: Optional[str] = Field(default=None, env="AZURE_OPENAI_ENDPOINT")
    azure_openai_api_version: str = Field(default="2025-01-01-preview", env="AZURE_OPENAI_API_VERSION")
    deepseek_endpoint: Optional[str] = Field(default=None, env="DEEPSEEK_ENDPOINT")
    mistral_endpoint: Optional[str] = Field(default=None, env="MISTRAL_ENDPOINT")
    ollama_endpoint: str = Field(default="http://localhost:11434", env="OLLAMA_ENDPOINT")
    openrouter_endpoint: str = Field(default="https://openrouter.ai/api/v1", env="OPENROUTER_ENDPOINT")
    alibaba_endpoint: Optional[str] = Field(default=None, env="ALIBABA_ENDPOINT")
    moonshot_endpoint: Optional[str] = Field(default=None, env="MOONSHOT_ENDPOINT")
    unbound_endpoint: Optional[str] = Field(default=None, env="UNBOUND_ENDPOINT")

    ollama_num_ctx: Optional[int] = Field(default=32000, env="OLLAMA_NUM_CTX")
    ollama_num_predict: Optional[int] = Field(default=1024, env="OLLAMA_NUM_PREDICT")

    # Planner LLM settings (optional, defaults to main LLM if not set)
    planner_provider: Optional[str] = Field(default=None, env="PLANNER_PROVIDER")
    planner_model_name: Optional[str] = Field(default=None, env="PLANNER_MODEL_NAME")
    planner_temperature: Optional[float] = Field(default=None, env="PLANNER_TEMPERATURE")
    planner_base_url: Optional[str] = Field(default=None, env="PLANNER_BASE_URL")
    planner_api_key: Optional[SecretStr] = Field(default=None, env="PLANNER_API_KEY")


class BrowserSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="MCP_BROWSER_")

    headless: bool = Field(default=False, env="HEADLESS") # General headless
    disable_security: bool = Field(default=False, env="DISABLE_SECURITY") # General disable security
    binary_path: Optional[str] = Field(default=None, env="BINARY_PATH")
    user_data_dir: Optional[str] = Field(default=None, env="USER_DATA_DIR")
    window_width: int = Field(default=1280, env="WINDOW_WIDTH")
    window_height: int = Field(default=1080, env="WINDOW_HEIGHT")
    use_own_browser: bool = Field(default=False, env="USE_OWN_BROWSER")
    cdp_url: Optional[str] = Field(default=None, env="CDP_URL")
    wss_url: Optional[str] = Field(default=None, env="WSS_URL") # For CDP connection if needed
    keep_open: bool = Field(default=False, env="KEEP_OPEN") # Server-managed browser persistence
    trace_path: Optional[str] = Field(default=None, env="TRACE_PATH")


class AgentToolSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="MCP_AGENT_TOOL_")

    max_steps: int = Field(default=100, env="MAX_STEPS")
    max_actions_per_step: int = Field(default=5, env="MAX_ACTIONS_PER_STEP")
    tool_calling_method: Optional[str] = Field(default="auto", env="TOOL_CALLING_METHOD")
    max_input_tokens: Optional[int] = Field(default=128000, env="MAX_INPUT_TOKENS")
    use_vision: bool = Field(default=True, env="USE_VISION")

    # Browser settings specific to this tool, can override general MCP_BROWSER_ settings
    headless: Optional[bool] = Field(default=None, env="HEADLESS")
    disable_security: Optional[bool] = Field(default=None, env="DISABLE_SECURITY")

    enable_recording: bool = Field(default=False, env="ENABLE_RECORDING")
    save_recording_path: Optional[str] = Field(default=None, env="SAVE_RECORDING_PATH") # e.g. ./tmp/recordings
    history_path: Optional[str] = Field(default=None, env="HISTORY_PATH") # e.g. ./tmp/agent_history


class DeepResearchToolSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="MCP_RESEARCH_TOOL_")

    max_parallel_browsers: int = Field(default=3, env="MAX_PARALLEL_BROWSERS")
    save_dir: Optional[str] = Field(default=None, env="SAVE_DIR") # Base dir, task_id will be appended. Optional now.


class PathSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="MCP_PATHS_")
    downloads: Optional[str] = Field(default=None, env="DOWNLOADS") # e.g. ./tmp/downloads


class ServerSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="MCP_SERVER_")
    log_file: Optional[str] = Field(default=None, env="LOG_FILE")
    logging_level: str = Field(default="ERROR", env="LOGGING_LEVEL")
    anonymized_telemetry: bool = Field(default=True, env="ANONYMIZED_TELEMETRY")
    mcp_config: Optional[Dict[str, Any]] = Field(default=None, env="MCP_CONFIG") # For controller's MCP client


class AppSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="MCP_", extra='ignore') # Root prefix

    llm: LLMSettings = Field(default_factory=LLMSettings)
    browser: BrowserSettings = Field(default_factory=BrowserSettings)
    agent_tool: AgentToolSettings = Field(default_factory=AgentToolSettings)
    research_tool: DeepResearchToolSettings = Field(default_factory=DeepResearchToolSettings)
    paths: PathSettings = Field(default_factory=PathSettings)
    server: ServerSettings = Field(default_factory=ServerSettings)

    @field_validator('server', 'llm', 'browser', 'agent_tool', 'research_tool', 'paths', mode='before')
    @classmethod
    def ensure_nested_defaults(cls, v: Any) -> Any:
        # This ensures that even if MCP_SERVER__LOG_FILE is set but MCP_SERVER is not,
        # the ServerSettings object is still created.
        # Pydantic-settings usually handles this, but being explicit can help.
        if v is None:
            return {}
        return v

    def get_api_key_for_provider(self, provider_name: Optional[str], is_planner: bool = False) -> Optional[str]:
        """Retrieves the API key for a given provider, checking generic, then specific."""
        llm_settings_to_use = self.llm
        provider_to_use = provider_name if provider_name else (self.llm.planner_provider if is_planner else self.llm.provider)

        if is_planner:
            if self.llm.planner_api_key:
                return self.llm.planner_api_key.get_secret_value()
            # Fallback to main LLM settings if planner-specific key is not set, but provider is
            if self.llm.planner_provider and not self.llm.planner_api_key:
                 llm_settings_to_use = self.llm # Check main llm settings for this provider
            # if no planner provider, it will use main llm provider and its key

        if not provider_to_use: # Should not happen if called correctly
            return None

        # Check generic API key first for the relevant LLM settings (main or planner if planner_api_key was set)
        if not is_planner and llm_settings_to_use.api_key: # only main LLM has generic api_key
             return llm_settings_to_use.api_key.get_secret_value()

        provider_specific_key_name = f"{provider_to_use.lower()}_api_key"
        if hasattr(llm_settings_to_use, provider_specific_key_name):
            key_val = getattr(llm_settings_to_use, provider_specific_key_name)
            if key_val and isinstance(key_val, SecretStr):
                return key_val.get_secret_value()
        return None

    def get_endpoint_for_provider(self, provider_name: Optional[str], is_planner: bool = False) -> Optional[str]:
        """Retrieves the endpoint for a given provider."""
        llm_settings_to_use = self.llm
        provider_to_use = provider_name if provider_name else (self.llm.planner_provider if is_planner else self.llm.provider)

        if is_planner:
            if self.llm.planner_base_url:
                return self.llm.planner_base_url
            if self.llm.planner_provider and not self.llm.planner_base_url:
                llm_settings_to_use = self.llm # Check main llm settings for this provider

        if not provider_to_use:
            return None

        if not is_planner and llm_settings_to_use.base_url: # only main LLM has generic base_url
            return llm_settings_to_use.base_url

        provider_specific_endpoint_name = f"{provider_to_use.lower()}_endpoint"
        if hasattr(llm_settings_to_use, provider_specific_endpoint_name):
            return getattr(llm_settings_to_use, provider_specific_endpoint_name)
        return None

    def get_llm_config(self, is_planner: bool = False) -> Dict[str, Any]:
        """Returns a dictionary of LLM settings suitable for llm_provider.get_llm_model."""
        provider = self.llm.planner_provider if is_planner and self.llm.planner_provider else self.llm.provider
        model_name = self.llm.planner_model_name if is_planner and self.llm.planner_model_name else self.llm.model_name
        temperature = self.llm.planner_temperature if is_planner and self.llm.planner_temperature is not None else self.llm.temperature

        api_key = self.get_api_key_for_provider(provider, is_planner=is_planner)
        base_url = self.get_endpoint_for_provider(provider, is_planner=is_planner)

        config = {
            "provider": provider,
            "model_name": model_name,
            "temperature": temperature,
            "api_key": api_key,
            "base_url": base_url,
            "use_vision": self.agent_tool.use_vision if not is_planner else False, # Planners typically don't need vision
            "tool_calling_method": self.agent_tool.tool_calling_method if not is_planner else "auto",
            "max_input_tokens": self.agent_tool.max_input_tokens if not is_planner else None,
        }

        if provider == "azure_openai":
            config["azure_openai_api_version"] = self.llm.azure_openai_api_version
        elif provider == "ollama":
            config["ollama_num_ctx"] = self.llm.ollama_num_ctx
            config["ollama_num_predict"] = self.llm.ollama_num_predict
        elif provider == "openrouter":
            config["provider"] = "openai"

        return config

# Global settings instance, to be imported by other modules
settings = AppSettings()

# Example usage (for testing this file directly):
if __name__ == "__main__":
    try:
        print("Loaded AppSettings:")
        print(settings.model_dump_json(indent=2))
        print(f"\nLLM API Key for main provider ({settings.llm.provider}): {settings.get_api_key_for_provider(settings.llm.provider)}")
        if settings.llm.planner_provider:
            print(f"LLM API Key for planner provider ({settings.llm.planner_provider}): {settings.get_api_key_for_provider(settings.llm.planner_provider, is_planner=True)}")

        print("\nMain LLM Config for get_llm_model:")
        print(settings.get_llm_config())
        if settings.llm.planner_provider:
            print("\nPlanner LLM Config for get_llm_model:")
            print(settings.get_llm_config(is_planner=True))
    except Exception as e:
        print(f"Error during settings load or test: {e}")
        import os
        print("MCP_RESEARCH_TOOL_SAVE_DIR:", os.getenv("MCP_RESEARCH_TOOL_SAVE_DIR"))

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/cli.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import logging
import os
import sys
import traceback
import uuid
from pathlib import Path
from typing import Any, Dict, Optional

import typer
from dotenv import load_dotenv

from .config import AppSettings, settings as global_settings # Import AppSettings and the global instance
# Import from _internal
from ._internal.agent.browser_use.browser_use_agent import BrowserUseAgent, AgentHistoryList
from ._internal.agent.deep_research.deep_research_agent import DeepResearchAgent
from ._internal.browser.custom_browser import CustomBrowser
from ._internal.browser.custom_context import (
    CustomBrowserContext,
    CustomBrowserContextConfig,
)
from ._internal.controller.custom_controller import CustomController
from ._internal.utils import llm_provider as internal_llm_provider
from browser_use.browser.browser import BrowserConfig
from browser_use.agent.views import AgentOutput
from browser_use.browser.views import BrowserState

app = typer.Typer(name="mcp-browser-cli", help="CLI for mcp-browser-use tools.")
logger = logging.getLogger("mcp_browser_cli")

class CLIState:
    settings: Optional[AppSettings] = None

cli_state = CLIState()

def setup_logging(level_str: str, log_file: Optional[str]):
    numeric_level = getattr(logging, level_str.upper(), logging.INFO)
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)
    logging.basicConfig(
        level=numeric_level,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        filename=log_file if log_file else None,
        filemode="a" if log_file else None,
        force=True
    )

@app.callback()
def main_callback(
    ctx: typer.Context,
    env_file: Optional[Path] = typer.Option(
        None, "--env-file", "-e", help="Path to .env file to load.", exists=True, dir_okay=False, resolve_path=True
    ),
    log_level: Optional[str] = typer.Option(
        None, "--log-level", "-l", help="Override logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)."
    )
):
    """
    MCP Browser Use CLI. Settings are loaded from environment variables.
    You can use an .env file for convenience.
    """
    if env_file:
        load_dotenv(env_file, override=True)
        logger.info(f"Loaded environment variables from: {env_file}")

    # Reload settings after .env might have been loaded and to apply overrides
    try:
        cli_state.settings = AppSettings()
    except Exception as e:
        # This can happen if mandatory fields (like MCP_RESEARCH_TOOL_SAVE_DIR) are not set
        sys.stderr.write(f"Error loading application settings: {e}\n")
        sys.stderr.write("Please ensure all mandatory environment variables are set (e.g., MCP_RESEARCH_TOOL_SAVE_DIR).\n")
        raise typer.Exit(code=1)

    # Setup logging based on final settings (env file, then env vars, then CLI override)
    final_log_level = log_level if log_level else cli_state.settings.server.logging_level
    final_log_file = cli_state.settings.server.log_file
    setup_logging(final_log_level, final_log_file)

    logger.info(f"CLI initialized. Effective log level: {final_log_level.upper()}")
    if not cli_state.settings: # Should not happen if AppSettings() worked
        logger.error("Failed to load application settings.")
        raise typer.Exit(code=1)


async def cli_ask_human_callback(query: str, browser_context: Any) -> Dict[str, Any]:
    """Callback for agent to ask human for input via CLI."""
    # browser_context is part of the signature from browser-use, might not be needed here
    print(typer.style(f"\n🤖 AGENT ASKS: {query}", fg=typer.colors.YELLOW))
    response_text = typer.prompt(typer.style("Your response", fg=typer.colors.CYAN))
    return {"response": response_text}

def cli_on_step_callback(browser_state: BrowserState, agent_output: AgentOutput, step_num: int):
    """CLI callback for BrowserUseAgent steps."""
    print(typer.style(f"\n--- Step {step_num} ---", fg=typer.colors.BLUE, bold=True))
    # Print current state if available
    if hasattr(agent_output, "current_state") and agent_output.current_state:
        print(typer.style("🧠 Agent State:", fg=typer.colors.MAGENTA))
        print(agent_output.current_state)
    # Print actions
    if hasattr(agent_output, "action") and agent_output.action:
        print(typer.style("🎬 Actions:", fg=typer.colors.GREEN))
        for action in agent_output.action:
            # Try to get action_type and action_input if present, else print the action itself
            action_type = getattr(action, "action_type", None)
            action_input = getattr(action, "action_input", None)
            if action_type is not None or action_input is not None:
                print(f"  - {action_type or 'Unknown action'}: {action_input or ''}")
            else:
                print(f"  - {action}")
    # Optionally print observation if present in browser_state
    if hasattr(browser_state, "observation") and browser_state.observation:
        obs = browser_state.observation
        print(typer.style("👀 Observation:", fg=typer.colors.CYAN))
        print(str(obs)[:200] + "..." if obs and len(str(obs)) > 200 else obs)


async def _run_browser_agent_logic_cli(task_str: str, current_settings: AppSettings) -> str:
    logger.info(f"CLI: Starting run_browser_agent task: {task_str[:100]}...")
    agent_task_id = str(uuid.uuid4())
    final_result = "Error: Agent execution failed."

    browser_instance: Optional[CustomBrowser] = None
    context_instance: Optional[CustomBrowserContext] = None
    controller_instance: Optional[CustomController] = None

    try:
        # LLM Setup
        main_llm_config = current_settings.get_llm_config()
        main_llm = internal_llm_provider.get_llm_model(**main_llm_config)
        planner_llm = None
        if current_settings.llm.planner_provider and current_settings.llm.planner_model_name:
            planner_llm_config = current_settings.get_llm_config(is_planner=True)
            planner_llm = internal_llm_provider.get_llm_model(**planner_llm_config)

        # Controller Setup
        controller_instance = CustomController(ask_assistant_callback=cli_ask_human_callback)
        if current_settings.server.mcp_config:
            mcp_dict_config = current_settings.server.mcp_config
            if isinstance(current_settings.server.mcp_config, str):
                mcp_dict_config = json.loads(current_settings.server.mcp_config)
            await controller_instance.setup_mcp_client(mcp_dict_config)

        # Browser and Context Setup
        agent_headless_override = current_settings.agent_tool.headless
        browser_headless = agent_headless_override if agent_headless_override is not None else current_settings.browser.headless
        agent_disable_security_override = current_settings.agent_tool.disable_security
        browser_disable_security = agent_disable_security_override if agent_disable_security_override is not None else current_settings.browser.disable_security

        if current_settings.browser.use_own_browser and current_settings.browser.cdp_url:
            browser_cfg = BrowserConfig(cdp_url=current_settings.browser.cdp_url, wss_url=current_settings.browser.wss_url, user_data_dir=current_settings.browser.user_data_dir)
        else:
            browser_cfg = BrowserConfig(
                headless=browser_headless,
                disable_security=browser_disable_security,
                browser_binary_path=current_settings.browser.binary_path,
                user_data_dir=current_settings.browser.user_data_dir,
                window_width=current_settings.browser.window_width,
                window_height=current_settings.browser.window_height,
            )
        browser_instance = CustomBrowser(config=browser_cfg)

        context_cfg = CustomBrowserContextConfig(
            trace_path=current_settings.browser.trace_path,
            save_downloads_path=current_settings.paths.downloads,
            save_recording_path=current_settings.agent_tool.save_recording_path if current_settings.agent_tool.enable_recording else None,
            force_new_context=True # CLI always gets a new context
        )
        context_instance = await browser_instance.new_context(config=context_cfg)

        agent_history_json_file = None
        task_history_base_path = current_settings.agent_tool.history_path

        if task_history_base_path:
            task_specific_history_dir = Path(task_history_base_path) / agent_task_id
            task_specific_history_dir.mkdir(parents=True, exist_ok=True)
            agent_history_json_file = str(task_specific_history_dir / f"{agent_task_id}.json")
            logger.info(f"Agent history will be saved to: {agent_history_json_file}")

        # Agent Instantiation
        agent_instance = BrowserUseAgent(
            task=task_str, llm=main_llm,
            browser=browser_instance, browser_context=context_instance, controller=controller_instance,
            planner_llm=planner_llm,
            max_actions_per_step=current_settings.agent_tool.max_actions_per_step,
            use_vision=current_settings.agent_tool.use_vision,
            register_new_step_callback=cli_on_step_callback,
        )

        # Run Agent
        history: AgentHistoryList = await agent_instance.run(max_steps=current_settings.agent_tool.max_steps)
        agent_instance.save_history(agent_history_json_file)
        final_result = history.final_result() or "Agent finished without a final result."
        logger.info(f"CLI Agent task {agent_task_id} completed.")

    except Exception as e:
        logger.error(f"CLI Error in run_browser_agent: {e}\n{traceback.format_exc()}")
        final_result = f"Error: {e}"
    finally:
        if context_instance: await context_instance.close()
        if browser_instance and not current_settings.browser.use_own_browser : await browser_instance.close() # Only close if we launched it
        if controller_instance: await controller_instance.close_mcp_client()

    return final_result


async def _run_deep_research_logic_cli(research_task_str: str, max_parallel_browsers_override: Optional[int], current_settings: AppSettings) -> str:
    logger.info(f"CLI: Starting run_deep_research task: {research_task_str[:100]}...")
    task_id = str(uuid.uuid4())
    report_content = "Error: Deep research failed."

    try:
        main_llm_config = current_settings.get_llm_config()
        research_llm = internal_llm_provider.get_llm_model(**main_llm_config)

        dr_browser_cfg = {
            "headless": current_settings.browser.headless,
            "disable_security": current_settings.browser.disable_security,
            "browser_binary_path": current_settings.browser.binary_path,
            "user_data_dir": current_settings.browser.user_data_dir,
            "window_width": current_settings.browser.window_width,
            "window_height": current_settings.browser.window_height,
            "trace_path": current_settings.browser.trace_path,
            "save_downloads_path": current_settings.paths.downloads,
        }
        if current_settings.browser.use_own_browser and current_settings.browser.cdp_url:
            dr_browser_cfg["cdp_url"] = current_settings.browser.cdp_url
            dr_browser_cfg["wss_url"] = current_settings.browser.wss_url

        mcp_server_config_for_agent = None
        if current_settings.server.mcp_config:
            mcp_server_config_for_agent = current_settings.server.mcp_config
            if isinstance(current_settings.server.mcp_config, str):
                 mcp_server_config_for_agent = json.loads(current_settings.server.mcp_config)

        agent_instance = DeepResearchAgent(
            llm=research_llm, browser_config=dr_browser_cfg,
            mcp_server_config=mcp_server_config_for_agent,
        )

        current_max_parallel_browsers = max_parallel_browsers_override if max_parallel_browsers_override is not None else current_settings.research_tool.max_parallel_browsers

        save_dir_for_task = os.path.join(current_settings.research_tool.save_dir, task_id)
        os.makedirs(save_dir_for_task, exist_ok=True)

        logger.info(f"CLI Deep research save directory: {save_dir_for_task}")
        logger.info(f"CLI Using max_parallel_browsers: {current_max_parallel_browsers}")

        result_dict = await agent_instance.run(
            topic=research_task_str, task_id=task_id,
            save_dir=save_dir_for_task, max_parallel_browsers=current_max_parallel_browsers
        )

        report_file_path = result_dict.get("report_file_path")
        if report_file_path and os.path.exists(report_file_path):
            with open(report_file_path, "r", encoding="utf-8") as f:
                markdown_content = f.read()
            report_content = f"Deep research report generated successfully at {report_file_path}\n\n{markdown_content}"
            logger.info(f"CLI Deep research task {task_id} completed. Report at {report_file_path}")
        else:
            report_content = f"Deep research completed, but report file not found. Result: {result_dict}"
            logger.warning(f"CLI Deep research task {task_id} result: {result_dict}, report file path missing or invalid.")

    except Exception as e:
        logger.error(f"CLI Error in run_deep_research: {e}\n{traceback.format_exc()}")
        report_content = f"Error: {e}"

    return report_content


@app.command()
def run_browser_agent(
    task: str = typer.Argument(..., help="The primary task or objective for the browser agent."),
):
    """Runs a browser agent task and prints the result."""
    if not cli_state.settings:
        typer.secho("Error: Application settings not loaded. Use --env-file or set environment variables.", fg=typer.colors.RED)
        raise typer.Exit(code=1)

    typer.secho(f"Executing browser agent task: {task}", fg=typer.colors.GREEN)
    try:
        result = asyncio.run(_run_browser_agent_logic_cli(task, cli_state.settings))
        typer.secho("\n--- Agent Final Result ---", fg=typer.colors.BLUE, bold=True)
        print(result)
    except Exception as e:
        typer.secho(f"CLI command failed: {e}", fg=typer.colors.RED)
        logger.error(f"CLI run_browser_agent command failed: {e}\n{traceback.format_exc()}")
        raise typer.Exit(code=1)

@app.command()
def run_deep_research(
    research_task: str = typer.Argument(..., help="The topic or question for deep research."),
    max_parallel_browsers: Optional[int] = typer.Option(None, "--max-parallel-browsers", "-p", help="Override max parallel browsers from settings.")
):
    """Performs deep web research and prints the report."""
    if not cli_state.settings:
        typer.secho("Error: Application settings not loaded. Use --env-file or set environment variables.", fg=typer.colors.RED)
        raise typer.Exit(code=1)

    typer.secho(f"Executing deep research task: {research_task}", fg=typer.colors.GREEN)
    try:
        result = asyncio.run(_run_deep_research_logic_cli(research_task, max_parallel_browsers, cli_state.settings))
        typer.secho("\n--- Deep Research Final Report ---", fg=typer.colors.BLUE, bold=True)
        print(result)
    except Exception as e:
        typer.secho(f"CLI command failed: {e}", fg=typer.colors.RED)
        logger.error(f"CLI run_deep_research command failed: {e}\n{traceback.format_exc()}")
        raise typer.Exit(code=1)

if __name__ == "__main__":
    # This allows running `python src/mcp_server_browser_use/cli.py ...`
    # Set a default log level if run directly for dev purposes, can be overridden by CLI args
    if not os.getenv("MCP_SERVER_LOGGING_LEVEL"): # Check if already set
        os.environ["MCP_SERVER_LOGGING_LEVEL"] = "DEBUG"
    if not os.getenv("MCP_RESEARCH_TOOL_SAVE_DIR"): # Ensure mandatory var is set for local dev
        print("Warning: MCP_RESEARCH_TOOL_SAVE_DIR not set. Defaulting to './tmp/deep_research_cli_default' for this run.", file=sys.stderr)
        os.environ["MCP_RESEARCH_TOOL_SAVE_DIR"] = "./tmp/deep_research_cli_default"

    app()

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/server.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import logging
import os
import traceback
import uuid
from typing import Any, Dict, Optional
from pathlib import Path


from .config import settings # Import global AppSettings instance

# Configure logging using settings
log_level_str = settings.server.logging_level.upper()
numeric_level = getattr(logging, log_level_str, logging.INFO)

# Remove any existing handlers from the root logger to avoid duplicate messages
# if basicConfig was called elsewhere or by a library.
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    level=numeric_level,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    filename=settings.server.log_file if settings.server.log_file else None,
    filemode="a" if settings.server.log_file else None, # only use filemode if filename is set
    force=True # Override any previous basicConfig
)

logger = logging.getLogger("mcp_server_browser_use")
# Prevent log propagation if other loggers are configured higher up
# logging.getLogger().propagate = False # This might be too aggressive, let's rely on basicConfig force

from browser_use.browser.browser import BrowserConfig
from mcp.server.fastmcp import Context, FastMCP

# Import from _internal
from ._internal.agent.browser_use.browser_use_agent import BrowserUseAgent
from ._internal.agent.deep_research.deep_research_agent import DeepResearchAgent
from ._internal.browser.custom_browser import CustomBrowser
from ._internal.browser.custom_context import (
    CustomBrowserContext,
    CustomBrowserContextConfig,
)
from ._internal.controller.custom_controller import CustomController
from ._internal.utils import llm_provider as internal_llm_provider # aliased

from browser_use.agent.views import (
    AgentHistoryList,
)

# Shared resources for MCP_BROWSER_KEEP_OPEN
shared_browser_instance: Optional[CustomBrowser] = None
shared_context_instance: Optional[CustomBrowserContext] = None
shared_controller_instance: Optional[CustomController] = None # Controller might also be shared
resource_lock = asyncio.Lock()


async def get_controller(ask_human_callback: Optional[Any] = None) -> CustomController:
    """Gets or creates a shared controller instance if keep_open is true, or a new one."""
    global shared_controller_instance
    if settings.browser.keep_open and shared_controller_instance:
        # Potentially update callback if it can change per call, though usually fixed for server
        return shared_controller_instance

    controller = CustomController(ask_assistant_callback=ask_human_callback)
    if settings.server.mcp_config:
        try:
            mcp_dict_config = settings.server.mcp_config
            if isinstance(settings.server.mcp_config, str): # if passed as JSON string
                mcp_dict_config = json.loads(settings.server.mcp_config)
            await controller.setup_mcp_client(mcp_dict_config)
        except Exception as e:
            logger.error(f"Failed to setup MCP client for controller: {e}")

    if settings.browser.keep_open:
        shared_controller_instance = controller
    return controller


async def get_browser_and_context() -> tuple[CustomBrowser, CustomBrowserContext]:
    """
    Manages creation/reuse of CustomBrowser and CustomBrowserContext
    based on settings.browser.keep_open and settings.browser.use_own_browser.
    """
    global shared_browser_instance, shared_context_instance

    current_browser: Optional[CustomBrowser] = None
    current_context: Optional[CustomBrowserContext] = None

    agent_headless_override = settings.agent_tool.headless
    browser_headless = agent_headless_override if agent_headless_override is not None else settings.browser.headless

    agent_disable_security_override = settings.agent_tool.disable_security
    browser_disable_security = agent_disable_security_override if agent_disable_security_override is not None else settings.browser.disable_security

    if settings.browser.use_own_browser and settings.browser.cdp_url:
        logger.info(f"Connecting to own browser via CDP: {settings.browser.cdp_url}")
        browser_cfg = BrowserConfig(
            cdp_url=settings.browser.cdp_url,
            wss_url=settings.browser.wss_url,
            user_data_dir=settings.browser.user_data_dir, # Useful for CDP
            # Headless, binary_path etc. are controlled by the user-launched browser
        )
        current_browser = CustomBrowser(config=browser_cfg)
        # For CDP, context config is minimal, trace/recording might not apply or be harder to manage
        context_cfg = CustomBrowserContextConfig(
            trace_path=settings.browser.trace_path,
            save_downloads_path=settings.paths.downloads,
            save_recording_path=settings.agent_tool.save_recording_path if settings.agent_tool.enable_recording else None,
        )
        current_context = await current_browser.new_context(config=context_cfg)

    elif settings.browser.keep_open:
        if shared_browser_instance and shared_context_instance:
            logger.info("Reusing shared browser and context.")
            # Ensure browser is still connected
            if not shared_browser_instance.is_connected():
                logger.warning("Shared browser was disconnected. Recreating.")
                if shared_context_instance: await shared_context_instance.close() # Close old context too
                await shared_browser_instance.close() # Close browser after context
                shared_browser_instance = None
                shared_context_instance = None
            else:
                current_browser = shared_browser_instance
                # For shared browser, we might want a new context or reuse.
                # For simplicity, let's reuse the context if keep_open is true.
                # If new context per call is needed, this logic would change.
                current_context = shared_context_instance

        if not current_browser or not current_context : # If shared instances were not valid or not yet created
            logger.info("Creating new shared browser and context.")
            browser_cfg = BrowserConfig(
                headless=browser_headless,
                disable_security=browser_disable_security,
                browser_binary_path=settings.browser.binary_path,
                user_data_dir=settings.browser.user_data_dir,
                window_width=settings.browser.window_width,
                window_height=settings.browser.window_height,
            )
            shared_browser_instance = CustomBrowser(config=browser_cfg)
            context_cfg = CustomBrowserContextConfig(
                trace_path=settings.browser.trace_path,
                save_downloads_path=settings.paths.downloads,
                save_recording_path=settings.agent_tool.save_recording_path if settings.agent_tool.enable_recording else None,
                force_new_context=False # Important for shared context
            )
            shared_context_instance = await shared_browser_instance.new_context(config=context_cfg)
            current_browser = shared_browser_instance
            current_context = shared_context_instance
    else: # Create new resources per call (not using own browser, not keeping open)
        logger.info("Creating new browser and context for this call.")
        browser_cfg = BrowserConfig(
            headless=browser_headless,
            disable_security=browser_disable_security,
            browser_binary_path=settings.browser.binary_path,
            user_data_dir=settings.browser.user_data_dir,
            window_width=settings.browser.window_width,
            window_height=settings.browser.window_height,
        )
        current_browser = CustomBrowser(config=browser_cfg)
        context_cfg = CustomBrowserContextConfig(
            trace_path=settings.browser.trace_path,
            save_downloads_path=settings.paths.downloads,
            save_recording_path=settings.agent_tool.save_recording_path if settings.agent_tool.enable_recording else None,
            force_new_context=True
        )
        current_context = await current_browser.new_context(config=context_cfg)

    if not current_browser or not current_context:
        raise RuntimeError("Failed to initialize browser or context")

    return current_browser, current_context


def serve() -> FastMCP:
    server = FastMCP("mcp_server_browser_use")

    @server.tool()
    async def run_browser_agent(ctx: Context, task: str) -> str:
        logger.info(f"Received run_browser_agent task: {task[:100]}...")
        agent_task_id = str(uuid.uuid4())
        final_result = "Error: Agent execution failed."

        browser_instance: Optional[CustomBrowser] = None
        context_instance: Optional[CustomBrowserContext] = None
        controller_instance: Optional[CustomController] = None

        try:
            async with resource_lock: # Protect shared resource access/creation
                browser_instance, context_instance = await get_browser_and_context()
                # For server, ask_human_callback is likely not interactive, can be None or a placeholder
                controller_instance = await get_controller(ask_human_callback=None)

            if not browser_instance or not context_instance or not controller_instance:
                 raise RuntimeError("Failed to acquire browser resources or controller.")

            main_llm_config = settings.get_llm_config()
            main_llm = internal_llm_provider.get_llm_model(**main_llm_config)

            planner_llm = None
            if settings.llm.planner_provider and settings.llm.planner_model_name:
                planner_llm_config = settings.get_llm_config(is_planner=True)
                planner_llm = internal_llm_provider.get_llm_model(**planner_llm_config)

            agent_history_json_file = None
            task_history_base_path = settings.agent_tool.history_path

            if task_history_base_path:
                task_specific_history_dir = Path(task_history_base_path) / agent_task_id
                task_specific_history_dir.mkdir(parents=True, exist_ok=True)
                agent_history_json_file = str(task_specific_history_dir / f"{agent_task_id}.json")
                logger.info(f"Agent history will be saved to: {agent_history_json_file}")

            agent_instance = BrowserUseAgent(
                task=task,
                llm=main_llm,
                browser=browser_instance,
                browser_context=context_instance,
                controller=controller_instance,
                planner_llm=planner_llm,
                max_actions_per_step=settings.agent_tool.max_actions_per_step,
                use_vision=settings.agent_tool.use_vision,
            )

            history: AgentHistoryList = await agent_instance.run(max_steps=settings.agent_tool.max_steps)

            if agent_history_json_file:
                agent_instance.save_history(agent_history_json_file)

            final_result = history.final_result() or "Agent finished without a final result."
            logger.info(f"Agent task completed. Result: {final_result[:100]}...")

        except Exception as e:
            logger.error(f"Error in run_browser_agent: {e}\n{traceback.format_exc()}")
            final_result = f"Error: {e}"
        finally:
            if not settings.browser.keep_open and not settings.browser.use_own_browser:
                logger.info("Closing browser resources for this call.")
                if context_instance:
                    await context_instance.close()
                if browser_instance:
                    await browser_instance.close()
                if controller_instance: # Close controller only if not shared
                    await controller_instance.close_mcp_client()
            elif settings.browser.use_own_browser: # Own browser, only close controller if not shared
                 if controller_instance and not (settings.browser.keep_open and controller_instance == shared_controller_instance):
                    await controller_instance.close_mcp_client()
        return final_result

    @server.tool()
    async def run_deep_research(
        ctx: Context,
        research_task: str,
        max_parallel_browsers_override: Optional[int] = None,
    ) -> str:
        logger.info(f"Received run_deep_research task: {research_task[:100]}...")
        task_id = str(uuid.uuid4()) # This task_id is used for the sub-directory name
        report_content = "Error: Deep research failed."

        try:
            main_llm_config = settings.get_llm_config() # Deep research uses main LLM config
            research_llm = internal_llm_provider.get_llm_model(**main_llm_config)

            # Prepare browser_config dict for DeepResearchAgent's sub-agents
            dr_browser_cfg = {
                "headless": settings.browser.headless, # Use general browser headless for sub-tasks
                "disable_security": settings.browser.disable_security,
                "browser_binary_path": settings.browser.binary_path,
                "user_data_dir": settings.browser.user_data_dir,
                "window_width": settings.browser.window_width,
                "window_height": settings.browser.window_height,
                "trace_path": settings.browser.trace_path, # For sub-agent traces
                "save_downloads_path": settings.paths.downloads, # For sub-agent downloads
            }
            if settings.browser.use_own_browser and settings.browser.cdp_url:
                # If main browser is CDP, sub-agents should also use it
                dr_browser_cfg["cdp_url"] = settings.browser.cdp_url
                dr_browser_cfg["wss_url"] = settings.browser.wss_url

            mcp_server_config_for_agent = None
            if settings.server.mcp_config:
                mcp_server_config_for_agent = settings.server.mcp_config
                if isinstance(settings.server.mcp_config, str):
                     mcp_server_config_for_agent = json.loads(settings.server.mcp_config)

            agent_instance = DeepResearchAgent(
                llm=research_llm,
                browser_config=dr_browser_cfg,
                mcp_server_config=mcp_server_config_for_agent,
            )

            current_max_parallel_browsers = max_parallel_browsers_override if max_parallel_browsers_override is not None else settings.research_tool.max_parallel_browsers

            # Check if save_dir is provided, otherwise use in-memory approach
            save_dir_for_this_task = None
            if settings.research_tool.save_dir:
                # If save_dir is provided, construct the full save directory path for this specific task
                save_dir_for_this_task = str(Path(settings.research_tool.save_dir) / task_id)
                logger.info(f"Deep research save directory for this task: {save_dir_for_this_task}")
            else:
                logger.info("No save_dir configured. Deep research will operate in memory-only mode.")

            logger.info(f"Using max_parallel_browsers: {current_max_parallel_browsers}")

            result_dict = await agent_instance.run(
                topic=research_task,
                save_dir=save_dir_for_this_task, # Can be None now
                task_id=task_id, # Pass the generated task_id
                max_parallel_browsers=current_max_parallel_browsers
            )

            # Handle the result based on if files were saved or not
            if save_dir_for_this_task and result_dict.get("report_file_path") and Path(result_dict["report_file_path"]).exists():
                with open(result_dict["report_file_path"], "r", encoding="utf-8") as f:
                    markdown_content = f.read()
                report_content = f"Deep research report generated successfully at {result_dict['report_file_path']}\n\n{markdown_content}"
                logger.info(f"Deep research task {task_id} completed. Report at {result_dict['report_file_path']}")
            elif result_dict.get("status") == "completed" and result_dict.get("final_report"):
                report_content = f"Deep research completed. Report content:\n\n{result_dict['final_report']}"
                if result_dict.get("report_file_path"):
                     report_content += f"\n(Expected report file at: {result_dict['report_file_path']})"
                logger.info(f"Deep research task {task_id} completed. Report content retrieved directly.")
            else:
                report_content = f"Deep research task {task_id} result: {result_dict}. Report file not found or content not available."
                logger.warning(report_content)


        except Exception as e:
            logger.error(f"Error in run_deep_research: {e}\n{traceback.format_exc()}")
            report_content = f"Error: {e}"

        return report_content

    return server

server_instance = serve() # Renamed from 'server' to avoid conflict with 'settings.server'

def main():
    logger.info("Starting MCP server for browser-use...")
    try:
        # Just log the Research tool save directory if it's configured
        if settings.research_tool.save_dir:
            logger.info(f"Research tool save directory configured: {settings.research_tool.save_dir}")
        else:
            logger.info("Research tool save directory not configured. Deep research will operate in memory-only mode.")
    except Exception as e:
        logger.error(f"Configuration error: {e}")
        return # Exit if there's a configuration error

    logger.info(f"Loaded settings with LLM provider: {settings.llm.provider}, Model: {settings.llm.model_name}")
    logger.info(f"Browser keep_open: {settings.browser.keep_open}, Use own browser: {settings.browser.use_own_browser}")
    if settings.browser.use_own_browser:
        logger.info(f"Connecting to own browser via CDP: {settings.browser.cdp_url}")
    server_instance.run()

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/src/mcp_server_browser_use/_internal/agent/deep_research/deep_research_agent.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import logging
import os
import pdb
import uuid
from pathlib import Path
from typing import List, Dict, Any, TypedDict, Optional, Sequence, Annotated
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

# Langchain imports
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import Tool, StructuredTool
from langchain.agents import AgentExecutor  # We might use parts, but Langgraph is primary
from langchain_community.tools.file_management import WriteFileTool, ReadFileTool, CopyFileTool, ListDirectoryTool, \
    MoveFileTool, FileSearchTool
from langchain_openai import ChatOpenAI  # Replace with your actual LLM import
from pydantic import BaseModel, Field
import operator

from browser_use.browser.browser import BrowserConfig
from browser_use.browser.context import BrowserContextWindowSize

# Langgraph imports
from langgraph.graph import StateGraph, END
from ...controller.custom_controller import CustomController
from ...utils import llm_provider
from ...browser.custom_browser import CustomBrowser
from ...browser.custom_context import CustomBrowserContext, CustomBrowserContextConfig
from ...agent.browser_use.browser_use_agent import BrowserUseAgent
from ...utils.mcp_client import setup_mcp_client_and_tools

logger = logging.getLogger(__name__)

# Constants
REPORT_FILENAME = "report.md"
PLAN_FILENAME = "research_plan.md"
SEARCH_INFO_FILENAME = "search_info.json"

_AGENT_STOP_FLAGS = {}
_BROWSER_AGENT_INSTANCES = {}


async def run_single_browser_task(
        task_query: str,
        task_id: str,
        llm: Any,  # Pass the main LLM
        browser_config: Dict[str, Any],
        stop_event: threading.Event,
        use_vision: bool = False,
) -> Dict[str, Any]:
    """
    Runs a single BrowserUseAgent task.
    Manages browser creation and closing for this specific task.
    """
    if not BrowserUseAgent:
        return {"query": task_query, "error": "BrowserUseAgent components not available."}

    # --- Browser Setup ---
    # These should ideally come from the main agent's config
    headless = browser_config.get("headless", False)
    window_w = browser_config.get("window_width", 1280)
    window_h = browser_config.get("window_height", 1100)
    browser_user_data_dir = browser_config.get("user_data_dir", None)
    use_own_browser = browser_config.get("use_own_browser", False)
    browser_binary_path = browser_config.get("browser_binary_path", None)
    wss_url = browser_config.get("wss_url", None)
    cdp_url = browser_config.get("cdp_url", None)
    disable_security = browser_config.get("disable_security", False)
    save_downloads_path = browser_config.get("save_downloads_path", None)
    trace_path = browser_config.get("trace_path", None)

    bu_browser = None
    bu_browser_context = None
    try:
        logger.info(f"Starting browser task for query: {task_query}")
        extra_args = [f"--window-size={window_w},{window_h}"]
        if browser_user_data_dir:
            extra_args.append(f"--user-data-dir={browser_user_data_dir}")
        if use_own_browser:
            browser_binary_path = os.getenv("CHROME_PATH", None) or browser_binary_path
            if browser_binary_path == "": browser_binary_path = None
            chrome_user_data = os.getenv("CHROME_USER_DATA", None)
            if chrome_user_data: extra_args += [f"--user-data-dir={chrome_user_data}"]
        else:
            browser_binary_path = None

        bu_browser = CustomBrowser(
            config=BrowserConfig(
                headless=headless,
                disable_security=disable_security,
                browser_binary_path=browser_binary_path,
                extra_browser_args=extra_args,
                wss_url=wss_url,
                cdp_url=cdp_url,
            )
        )

        context_config = CustomBrowserContextConfig(
            save_downloads_path=save_downloads_path,
            trace_path=trace_path,
            browser_window_size=BrowserContextWindowSize(width=window_w, height=window_h),
            force_new_context=True
        )
        bu_browser_context = await bu_browser.new_context(config=context_config)

        # Simple controller example, replace with your actual implementation if needed
        bu_controller = CustomController()

        # Construct the task prompt for BrowserUseAgent
        # Instruct it to find specific info and return title/URL
        bu_task_prompt = f"""
        Research Task: {task_query}
        Objective: Find relevant information answering the query.
        Output Requirements: For each relevant piece of information found, please provide:
        1. A concise summary of the information.
        2. The title of the source page or document.
        3. The URL of the source.
        Focus on accuracy and relevance. Avoid irrelevant details.
        PDF cannot directly extract _content, please try to download first, then using read_file, if you can't save or read, please try other methods.
        """

        bu_agent_instance = BrowserUseAgent(
            task=bu_task_prompt,
            llm=llm,  # Use the passed LLM
            browser=bu_browser,
            browser_context=bu_browser_context,
            controller=bu_controller,
            use_vision=use_vision,
        )

        # Store instance for potential stop() call
        task_key = f"{task_id}_{uuid.uuid4()}"
        _BROWSER_AGENT_INSTANCES[task_key] = bu_agent_instance

        # --- Run with Stop Check ---
        # BrowserUseAgent needs to internally check a stop signal or have a stop method.
        # We simulate checking before starting and assume `run` might be interruptible
        # or have its own stop mechanism we can trigger via bu_agent_instance.stop().
        if stop_event.is_set():
            logger.info(f"Browser task for '{task_query}' cancelled before start.")
            return {"query": task_query, "result": None, "status": "cancelled"}

        # The run needs to be awaitable and ideally accept a stop signal or have a .stop() method
        # result = await bu_agent_instance.run(max_steps=max_steps) # Add max_steps if applicable
        # Let's assume a simplified run for now
        logger.info(f"Running BrowserUseAgent for: {task_query}")
        result = await bu_agent_instance.run()  # Assuming run is the main method
        logger.info(f"BrowserUseAgent finished for: {task_query}")

        final_data = result.final_result()

        if stop_event.is_set():
            logger.info(f"Browser task for '{task_query}' stopped during execution.")
            return {"query": task_query, "result": final_data, "status": "stopped"}
        else:
            logger.info(f"Browser result for '{task_query}': {final_data}")
            return {"query": task_query, "result": final_data, "status": "completed"}

    except Exception as e:
        logger.error(f"Error during browser task for query '{task_query}': {e}", exc_info=True)
        return {"query": task_query, "error": str(e), "status": "failed"}
    finally:
        if bu_browser_context:
            try:
                await bu_browser_context.close()
                bu_browser_context = None
                logger.info("Closed browser context.")
            except Exception as e:
                logger.error(f"Error closing browser context: {e}")
        if bu_browser:
            try:
                await bu_browser._close_without_httpxclients()
                bu_browser = None
                logger.info("Closed browser.")
            except Exception as e:
                logger.error(f"Error closing browser: {e}")

        if task_key in _BROWSER_AGENT_INSTANCES:
            del _BROWSER_AGENT_INSTANCES[task_key]


class BrowserSearchInput(BaseModel):
    queries: List[str] = Field(
        description=f"List of distinct search queries to find information relevant to the research task.")


async def _run_browser_search_tool(
        queries: List[str],
        task_id: str,  # Injected dependency
        llm: Any,  # Injected dependency
        browser_config: Dict[str, Any],
        stop_event: threading.Event,
        max_parallel_browsers: int = 1
) -> List[Dict[str, Any]]:
    """
    Internal function to execute parallel browser searches based on LLM-provided queries.
    Handles concurrency and stop signals.
    """

    # Limit queries just in case LLM ignores the description
    queries = queries[:max_parallel_browsers]
    logger.info(f"[Browser Tool {task_id}] Running search for {len(queries)} queries: {queries}")

    results = []
    semaphore = asyncio.Semaphore(max_parallel_browsers)

    async def task_wrapper(query):
        async with semaphore:
            if stop_event.is_set():
                logger.info(f"[Browser Tool {task_id}] Skipping task due to stop signal: {query}")
                return {"query": query, "result": None, "status": "cancelled"}
            # Pass necessary injected configs and the stop event
            return await run_single_browser_task(
                query,
                task_id,
                llm,  # Pass the main LLM (or a dedicated one if needed)
                browser_config,
                stop_event
                # use_vision could be added here if needed
            )

    tasks = [task_wrapper(query) for query in queries]
    search_results = await asyncio.gather(*tasks, return_exceptions=True)

    processed_results = []
    for i, res in enumerate(search_results):
        query = queries[i]  # Get corresponding query
        if isinstance(res, Exception):
            logger.error(f"[Browser Tool {task_id}] Gather caught exception for query '{query}': {res}", exc_info=True)
            processed_results.append({"query": query, "error": str(res), "status": "failed"})
        elif isinstance(res, dict):
            processed_results.append(res)
        else:
            logger.error(f"[Browser Tool {task_id}] Unexpected result type for query '{query}': {type(res)}")
            processed_results.append({"query": query, "error": "Unexpected result type", "status": "failed"})

    logger.info(f"[Browser Tool {task_id}] Finished search. Results count: {len(processed_results)}")
    return processed_results


def create_browser_search_tool(
        llm: Any,
        browser_config: Dict[str, Any],
        task_id: str,
        stop_event: threading.Event,
        max_parallel_browsers: int = 1,
) -> StructuredTool:
    """Factory function to create the browser search tool with necessary dependencies."""
    # Use partial to bind the dependencies that aren't part of the LLM call arguments
    from functools import partial
    bound_tool_func = partial(
        _run_browser_search_tool,
        task_id=task_id,
        llm=llm,
        browser_config=browser_config,
        stop_event=stop_event,
        max_parallel_browsers=max_parallel_browsers
    )

    return StructuredTool.from_function(
        coroutine=bound_tool_func,
        name="parallel_browser_search",
        description=f"""Use this tool to actively search the web for information related to a specific research task or question.
It runs up to {max_parallel_browsers} searches in parallel using a browser agent for better results than simple scraping.
Provide a list of distinct search queries(up to {max_parallel_browsers}) that are likely to yield relevant information.""",
        args_schema=BrowserSearchInput,
    )


# --- Langgraph State Definition ---

class ResearchPlanItem(TypedDict):
    step: int
    task: str
    status: str  # "pending", "completed", "failed"
    queries: Optional[List[str]]  # Queries generated for this task
    result_summary: Optional[str]  # Optional brief summary after execution


class DeepResearchState(TypedDict):
    task_id: str
    topic: str
    research_plan: List[ResearchPlanItem]
    search_results: List[Dict[str, Any]]  # Stores results from browser_search_tool_func
    # messages: Sequence[BaseMessage] # History for ReAct-like steps within nodes
    llm: Any  # The LLM instance
    tools: List[Tool]
    output_dir: Path
    browser_config: Dict[str, Any]
    final_report: Optional[str]
    current_step_index: int  # To track progress through the plan
    stop_requested: bool  # Flag to signal termination
    # Add other state variables as needed
    error_message: Optional[str]  # To store errors

    messages: List[BaseMessage]


# --- Langgraph Nodes ---

def _load_previous_state(task_id: str, output_dir: str) -> Dict[str, Any]:
    """Loads state from files if they exist."""
    state_updates = {}
    plan_file = os.path.join(output_dir, PLAN_FILENAME)
    search_file = os.path.join(output_dir, SEARCH_INFO_FILENAME)
    if os.path.exists(plan_file):
        try:
            with open(plan_file, 'r', encoding='utf-8') as f:
                # Basic parsing, assumes markdown checklist format
                plan = []
                step = 1
                for line in f:
                    line = line.strip()
                    if line.startswith(("- [x]", "- [ ]")):
                        status = "completed" if line.startswith("- [x]") else "pending"
                        task = line[5:].strip()
                        plan.append(
                            ResearchPlanItem(step=step, task=task, status=status, queries=None, result_summary=None))
                        step += 1
                state_updates['research_plan'] = plan
                # Determine next step index based on loaded plan
                next_step = next((i for i, item in enumerate(plan) if item['status'] == 'pending'), len(plan))
                state_updates['current_step_index'] = next_step
                logger.info(f"Loaded research plan from {plan_file}, next step index: {next_step}")
        except Exception as e:
            logger.error(f"Failed to load or parse research plan {plan_file}: {e}")
            state_updates['error_message'] = f"Failed to load research plan: {e}"
    if os.path.exists(search_file):
        try:
            with open(search_file, 'r', encoding='utf-8') as f:
                state_updates['search_results'] = json.load(f)
                logger.info(f"Loaded search results from {search_file}")
        except Exception as e:
            logger.error(f"Failed to load search results {search_file}: {e}")
            state_updates['error_message'] = f"Failed to load search results: {e}"
            # Decide if this is fatal or if we can continue without old results

    return state_updates


def _save_plan_to_md(plan: List[ResearchPlanItem], output_dir: str):
    """Saves the research plan to a markdown checklist file."""
    plan_file = os.path.join(output_dir, PLAN_FILENAME)
    try:
        with open(plan_file, 'w', encoding='utf-8') as f:
            f.write("# Research Plan\n\n")
            for item in plan:
                marker = "- [x]" if item['status'] == 'completed' else "- [ ]"
                f.write(f"{marker} {item['task']}\n")
        logger.info(f"Research plan saved to {plan_file}")
    except Exception as e:
        logger.error(f"Failed to save research plan to {plan_file}: {e}")


def _save_search_results_to_json(results: List[Dict[str, Any]], output_dir: str):
    """Appends or overwrites search results to a JSON file."""
    search_file = os.path.join(output_dir, SEARCH_INFO_FILENAME)
    try:
        # Simple overwrite for now, could be append
        with open(search_file, 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)
        logger.info(f"Search results saved to {search_file}")
    except Exception as e:
        logger.error(f"Failed to save search results to {search_file}: {e}")


def _save_report_to_md(report: str, output_dir: Path):
    """Saves the final report to a markdown file."""
    report_file = os.path.join(output_dir, REPORT_FILENAME)
    try:
        with open(report_file, 'w', encoding='utf-8') as f:
            f.write(report)
        logger.info(f"Final report saved to {report_file}")
    except Exception as e:
        logger.error(f"Failed to save final report to {report_file}: {e}")


async def planning_node(state: DeepResearchState) -> Dict[str, Any]:
    """Generates the initial research plan or refines it if resuming."""
    logger.info("--- Entering Planning Node ---")
    if state.get('stop_requested'):
        logger.info("Stop requested, skipping planning.")
        return {"stop_requested": True}

    llm = state['llm']
    topic = state['topic']
    existing_plan = state.get('research_plan')
    existing_results = state.get('search_results')
    output_dir = state['output_dir']

    if existing_plan and state.get('current_step_index', 0) > 0:
        logger.info("Resuming with existing plan.")
        # Maybe add logic here to let LLM review and potentially adjust the plan
        # based on existing_results, but for now, we just use the loaded plan.
        if output_dir:
            _save_plan_to_md(existing_plan, output_dir)  # Ensure it's saved initially if output_dir exists
        return {"research_plan": existing_plan}  # Return the loaded plan

    logger.info(f"Generating new research plan for topic: {topic}")

    prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a meticulous research assistant. Your goal is to create a step-by-step research plan to thoroughly investigate a given topic.
        The plan should consist of clear, actionable research tasks or questions. Each step should logically build towards a comprehensive understanding.
        Format the output as a numbered list. Each item should represent a distinct research step or question.
        Example:
        1. Define the core concepts and terminology related to [Topic].
        2. Identify the key historical developments of [Topic].
        3. Analyze the current state-of-the-art and recent advancements in [Topic].
        4. Investigate the major challenges and limitations associated with [Topic].
        5. Explore the future trends and potential applications of [Topic].
        6. Summarize the findings and draw conclusions.

        Keep the plan focused and manageable. Aim for 5-10 detailed steps.
        """),
        ("human", f"Generate a research plan for the topic: {topic}")
    ])

    try:
        response = await llm.ainvoke(prompt.format_prompt(topic=topic).to_messages())
        plan_text = response.content

        # Parse the numbered list into the plan structure
        new_plan: List[ResearchPlanItem] = []
        for i, line in enumerate(plan_text.strip().split('\n')):
            line = line.strip()
            if line and (line[0].isdigit() or line.startswith(("*", "-"))):
                # Simple parsing: remove number/bullet and space
                task_text = line.split('.', 1)[-1].strip() if line[0].isdigit() else line[1:].strip()
                if task_text:
                    new_plan.append(ResearchPlanItem(
                        step=i + 1,
                        task=task_text,
                        status="pending",
                        queries=None,
                        result_summary=None
                    ))

        if not new_plan:
            logger.error("LLM failed to generate a valid plan structure.")
            return {"error_message": "Failed to generate research plan structure."}

        logger.info(f"Generated research plan with {len(new_plan)} steps.")
        if output_dir:
            _save_plan_to_md(new_plan, output_dir)

        return {
            "research_plan": new_plan,
            "current_step_index": 0,  # Start from the beginning
            "search_results": [],  # Initialize search results
        }

    except Exception as e:
        logger.error(f"Error during planning: {e}", exc_info=True)
        return {"error_message": f"LLM Error during planning: {e}"}


async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]:
    """
    Executes the next step in the research plan by invoking the LLM with tools.
    The LLM decides which tool (e.g., browser search) to use and provides arguments.
    """
    logger.info("--- Entering Research Execution Node ---")
    if state.get('stop_requested'):
        logger.info("Stop requested, skipping research execution.")
        return {"stop_requested": True, "current_step_index": state['current_step_index']}  # Keep index same

    plan = state['research_plan']
    current_index = state['current_step_index']
    llm = state['llm']
    tools = state['tools']  # Tools are now passed in state
    output_dir = str(state['output_dir']) if state['output_dir'] else None
    task_id = state['task_id']
    # Stop event is bound inside the tool function, no need to pass directly here

    if not plan or current_index >= len(plan):
        logger.info("Research plan complete or empty.")
        # This condition should ideally be caught by `should_continue` before reaching here
        return {}

    current_step = plan[current_index]
    if current_step['status'] == 'completed':
        logger.info(f"Step {current_step['step']} already completed, skipping.")
        return {"current_step_index": current_index + 1}  # Move to next step

    logger.info(f"Executing research step {current_step['step']}: {current_step['task']}")

    # Bind tools to the LLM for this call
    llm_with_tools = llm.bind_tools(tools)
    if state['messages']:
        current_task_message = [HumanMessage(
            content=f"Research Task (Step {current_step['step']}): {current_step['task']}")]
        invocation_messages = state['messages'] + current_task_message
    else:
        current_task_message = [
            SystemMessage(
                content="You are a research assistant executing one step of a research plan. Use the available tools, especially the 'parallel_browser_search' tool, to gather information needed for the current task. Be precise with your search queries if using the browser tool."),
            HumanMessage(
                content=f"Research Task (Step {current_step['step']}): {current_step['task']}")
        ]
        invocation_messages = current_task_message

    try:
        # Invoke the LLM, expecting it to make a tool call
        logger.info(f"Invoking LLM with tools for task: {current_step['task']}")
        ai_response: BaseMessage = await llm_with_tools.ainvoke(invocation_messages)
        logger.info("LLM invocation complete.")

        tool_results = []
        executed_tool_names = []

        if not isinstance(ai_response, AIMessage) or not ai_response.tool_calls:
            # LLM didn't call a tool. Maybe it answered directly? Or failed?
            logger.warning(
                f"LLM did not call any tool for step {current_step['step']}. Response: {ai_response.content[:100]}...")
            # How to handle this? Mark step as failed? Or store the content?
            # Let's mark as failed for now, assuming a tool was expected.
            current_step['status'] = 'failed'
            current_step['result_summary'] = "LLM did not use a tool as expected."
            if output_dir:
                _save_plan_to_md(plan, output_dir)
            return {
                "research_plan": plan,
                "current_step_index": current_index + 1,
                "error_message": f"LLM failed to call a tool for step {current_step['step']}."
            }

        # Process tool calls
        for tool_call in ai_response.tool_calls:
            tool_name = tool_call.get("name")
            tool_args = tool_call.get("args", {})
            tool_call_id = tool_call.get("id")  # Important for ToolMessage

            logger.info(f"LLM requested tool call: {tool_name} with args: {tool_args}")
            executed_tool_names.append(tool_name)

            # Find the corresponding tool instance
            selected_tool = next((t for t in tools if t.name == tool_name), None)

            if not selected_tool:
                logger.error(f"LLM called tool '{tool_name}' which is not available.")
                # Create a ToolMessage indicating the error
                tool_results.append(ToolMessage(
                    content=f"Error: Tool '{tool_name}' not found.",
                    tool_call_id=tool_call_id
                ))
                continue  # Skip to next tool call if any

            # Execute the tool
            try:
                # Stop check before executing the tool (tool itself also checks)
                stop_event = _AGENT_STOP_FLAGS.get(task_id)
                if stop_event and stop_event.is_set():
                    logger.info(f"Stop requested before executing tool: {tool_name}")
                    current_step['status'] = 'pending'  # Not completed due to stop
                    if output_dir:
                        _save_plan_to_md(plan, output_dir)
                    return {"stop_requested": True, "research_plan": plan}

                logger.info(f"Executing tool: {tool_name}")
                # Assuming tool functions handle async correctly
                tool_output = await selected_tool.ainvoke(tool_args)
                logger.info(f"Tool '{tool_name}' executed successfully.")
                browser_tool_called = "parallel_browser_search" in executed_tool_names
                # Append result to overall search results
                current_search_results = state.get('search_results', [])
                if browser_tool_called:  # Specific handling for browser tool output
                    current_search_results.extend(tool_output)
                else:  # Handle other tool outputs (e.g., file tools return strings)
                    # Store it associated with the step? Or a generic log?
                    # Let's just log it for now. Need better handling for diverse tool outputs.
                    logger.info(f"Result from tool '{tool_name}': {str(tool_output)[:200]}...")

                # Store result for potential next LLM call (if we were doing multi-turn)
                tool_results.append(ToolMessage(
                    content=json.dumps(tool_output),
                    tool_call_id=tool_call_id
                ))

            except Exception as e:
                logger.error(f"Error executing tool '{tool_name}': {e}", exc_info=True)
                tool_results.append(ToolMessage(
                    content=f"Error executing tool {tool_name}: {e}",
                    tool_call_id=tool_call_id
                ))
                # Also update overall state search_results with error?
                current_search_results = state.get('search_results', [])
                current_search_results.append(
                    {"tool_name": tool_name, "args": tool_args, "status": "failed", "error": str(e)})

        # Basic check: Did the browser tool run at all? (More specific checks needed)
        browser_tool_called = "parallel_browser_search" in executed_tool_names
        # We might need a more nuanced status based on the *content* of tool_results
        step_failed = any("Error:" in str(tr.content) for tr in tool_results) or not browser_tool_called

        if step_failed:
            logger.warning(f"Step {current_step['step']} failed or did not yield results via browser search.")
            current_step['status'] = 'failed'
            current_step[
                'result_summary'] = f"Tool execution failed or browser tool not used. Errors: {[tr.content for tr in tool_results if 'Error' in str(tr.content)]}"
        else:
            logger.info(f"Step {current_step['step']} completed using tool(s): {executed_tool_names}.")
            current_step['status'] = 'completed'

            current_step['result_summary'] = f"Executed tool(s): {', '.join(executed_tool_names)}."

        if output_dir:
            _save_plan_to_md(plan, output_dir)
            _save_search_results_to_json(current_search_results, output_dir)

        return {
            "research_plan": plan,
            "search_results": current_search_results,  # Update with new results
            "current_step_index": current_index + 1,
            "messages": state["messages"] + current_task_message + [ai_response] + tool_results,
            # Optionally return the tool_results messages if needed by downstream nodes
        }

    except Exception as e:
        logger.error(f"Unhandled error during research execution node for step {current_step['step']}: {e}",
                     exc_info=True)
        current_step['status'] = 'failed'
        if output_dir:
            _save_plan_to_md(plan, output_dir)
        return {
            "research_plan": plan,
            "current_step_index": current_index + 1,  # Move on even if error?
            "error_message": f"Core Execution Error on step {current_step['step']}: {e}"
        }


async def synthesis_node(state: DeepResearchState) -> Dict[str, Any]:
    """Synthesizes the final report from the collected search results."""
    logger.info("--- Entering Synthesis Node ---")
    if state.get('stop_requested'):
        logger.info("Stop requested, skipping synthesis.")
        return {"stop_requested": True}

    llm = state['llm']
    topic = state['topic']
    search_results = state.get('search_results', [])
    output_dir = state['output_dir']
    plan = state['research_plan']  # Include plan for context

    if not search_results:
        logger.warning("No search results found to synthesize report.")
        report = f"# Research Report: {topic}\n\nNo information was gathered during the research process."
        if output_dir:
            _save_report_to_md(report, output_dir)
        return {"final_report": report}

    logger.info(f"Synthesizing report from {len(search_results)} collected search result entries.")

    # Prepare context for the LLM
    # Format search results nicely, maybe group by query or original plan step
    formatted_results = ""
    references = {}
    ref_count = 1
    for i, result_entry in enumerate(search_results):
        query = result_entry.get('query', 'Unknown Query')
        status = result_entry.get('status', 'unknown')
        result_data = result_entry.get('result')  # This should be the dict with summary, title, url
        error = result_entry.get('error')

        if status == 'completed' and result_data:
            summary = result_data
            formatted_results += f"### Finding from Query: \"{query}\"\n"
            formatted_results += f"- **Summary:**\n{summary}\n"
            formatted_results += "---\n"

        elif status == 'failed':
            formatted_results += f"### Failed Query: \"{query}\"\n"
            formatted_results += f"- **Error:** {error}\n"
            formatted_results += "---\n"
        # Ignore cancelled/other statuses for the report content

    # Prepare the research plan context
    plan_summary = "\nResearch Plan Followed:\n"
    for item in plan:
        marker = "- [x]" if item['status'] == 'completed' else "- [ ] (Failed)" if item[
                                                                                       'status'] == 'failed' else "- [ ]"
        plan_summary += f"{marker} {item['task']}\n"

    synthesis_prompt = ChatPromptTemplate.from_messages([
        ("system", """You are a professional researcher tasked with writing a comprehensive and well-structured report based on collected findings.
        The report should address the research topic thoroughly, synthesizing the information gathered from various sources.
        Structure the report logically:
        1.  **Introduction:** Briefly introduce the topic and the report's scope (mentioning the research plan followed is good).
        2.  **Main Body:** Discuss the key findings, organizing them thematically or according to the research plan steps. Analyze, compare, and contrast information from different sources where applicable. **Crucially, cite your sources using bracketed numbers [X] corresponding to the reference list.**
        3.  **Conclusion:** Summarize the main points and offer concluding thoughts or potential areas for further research.

        Ensure the tone is objective, professional, and analytical. Base the report **strictly** on the provided findings. Do not add external knowledge. If findings are contradictory or incomplete, acknowledge this.
        """),
        ("human", f"""
        **Research Topic:** {topic}

        {plan_summary}

        **Collected Findings:**
        ```
        {formatted_results}
        ```

        ```

        Please generate the final research report in Markdown format based **only** on the information above. Ensure all claims derived from the findings are properly cited using the format [Reference_ID].
        """)
    ])

    try:
        response = await llm.ainvoke(synthesis_prompt.format_prompt(
            topic=topic,
            plan_summary=plan_summary,
            formatted_results=formatted_results,
            references=references
        ).to_messages())
        final_report_md = response.content

        # Append the reference list automatically to the end of the generated markdown
        if references:
            report_references_section = "\n\n## References\n\n"
            # Sort refs by ID for consistent output
            sorted_refs = sorted(references.values(), key=lambda x: x['id'])
            for ref in sorted_refs:
                report_references_section += f"[{ref['id']}] {ref['title']} - {ref['url']}\n"
            final_report_md += report_references_section

        logger.info("Successfully synthesized the final report.")
        if output_dir:
            _save_report_to_md(final_report_md, output_dir)
        return {"final_report": final_report_md}

    except Exception as e:
        logger.error(f"Error during report synthesis: {e}", exc_info=True)
        return {"error_message": f"LLM Error during synthesis: {e}"}


# --- Langgraph Edges and Conditional Logic ---

def should_continue(state: DeepResearchState) -> str:
    """Determines the next step based on the current state."""
    logger.info("--- Evaluating Condition: Should Continue? ---")
    if state.get('stop_requested'):
        logger.info("Stop requested, routing to END.")
        return "end_run"  # Go to a dedicated end node for cleanup if needed
    if state.get('error_message'):
        logger.warning(f"Error detected: {state['error_message']}. Routing to END.")
        # Decide if errors should halt execution or if it should try to synthesize anyway
        return "end_run"  # Stop on error for now

    plan = state.get('research_plan')
    current_index = state.get('current_step_index', 0)

    if not plan:
        logger.warning("No research plan found, cannot continue execution. Routing to END.")
        return "end_run"  # Should not happen if planning node ran correctly

    # Check if there are pending steps in the plan
    if current_index < len(plan):
        logger.info(
            f"Plan has pending steps (current index {current_index}/{len(plan)}). Routing to Research Execution.")
        return "execute_research"
    else:
        logger.info("All plan steps processed. Routing to Synthesis.")
        return "synthesize_report"


# --- DeepSearchAgent Class ---

class DeepResearchAgent:
    def __init__(self, llm: Any, browser_config: Dict[str, Any], mcp_server_config: Optional[Dict[str, Any]] = None):
        """
        Initializes the DeepSearchAgent.

        Args:
            llm: The Langchain compatible language model instance.
            browser_config: Configuration dictionary for the BrowserUseAgent tool.
                            Example: {"headless": True, "window_width": 1280, ...}
            mcp_server_config: Optional configuration for the MCP client.
        """
        self.llm = llm
        self.browser_config = browser_config
        self.mcp_server_config = mcp_server_config
        self.mcp_client = None
        self.stopped = False
        self.graph = self._compile_graph()
        self.current_task_id: Optional[str] = None
        self.stop_event: Optional[threading.Event] = None
        self.runner: Optional[asyncio.Task] = None  # To hold the asyncio task for run

    async def _setup_tools(self, task_id: str, stop_event: threading.Event, max_parallel_browsers: int = 1) -> List[
        Tool]:
        """Sets up the basic tools (File I/O) and optional MCP tools."""
        tools = [WriteFileTool(), ReadFileTool(), ListDirectoryTool()]  # Basic file operations
        browser_use_tool = create_browser_search_tool(
            llm=self.llm,
            browser_config=self.browser_config,
            task_id=task_id,
            stop_event=stop_event,
            max_parallel_browsers=max_parallel_browsers
        )
        tools += [browser_use_tool]
        # Add MCP tools if config is provided
        if self.mcp_server_config:
            try:
                logger.info("Setting up MCP client and tools...")
                if not self.mcp_client:
                    self.mcp_client = await setup_mcp_client_and_tools(self.mcp_server_config)
                mcp_tools = self.mcp_client.get_tools()
                logger.info(f"Loaded {len(mcp_tools)} MCP tools.")
                tools.extend(mcp_tools)
            except Exception as e:
                logger.error(f"Failed to set up MCP tools: {e}", exc_info=True)
        elif self.mcp_server_config:
            logger.warning("MCP server config provided, but setup function unavailable.")
        tools_map = {tool.name: tool for tool in tools}
        return tools_map.values()

    async def close_mcp_client(self):
        if self.mcp_client:
            await self.mcp_client.__aexit__(None, None, None)
            self.mcp_client = None

    def _compile_graph(self) -> StateGraph:
        """Compiles the Langgraph state machine."""
        workflow = StateGraph(DeepResearchState)

        # Add nodes
        workflow.add_node("plan_research", planning_node)
        workflow.add_node("execute_research", research_execution_node)
        workflow.add_node("synthesize_report", synthesis_node)
        workflow.add_node("end_run", lambda state: logger.info("--- Reached End Run Node ---") or {})  # Simple end node

        # Define edges
        workflow.set_entry_point("plan_research")

        workflow.add_edge("plan_research", "execute_research")  # Always execute after planning

        # Conditional edge after execution
        workflow.add_conditional_edges(
            "execute_research",
            should_continue,
            {
                "execute_research": "execute_research",  # Loop back if more steps
                "synthesize_report": "synthesize_report",  # Move to synthesis if done
                "end_run": "end_run"  # End if stop requested or error
            }
        )

        workflow.add_edge("synthesize_report", "end_run")  # End after synthesis

        app = workflow.compile()
        return app

    async def run(self, topic: str, save_dir: Optional[str] = None, task_id: Optional[str] = None, max_parallel_browsers: int = 1) -> Dict[
        str, Any]:
        """
        Starts the deep research process.

        Args:
            topic: The research topic.
            save_dir: Optional directory to save outputs for this task. If None, operates in memory-only mode.
            task_id: Optional existing task ID to resume. If None, a new ID is generated.
            max_parallel_browsers: Max parallel browsers for the search tool.

        Returns:
             A dictionary containing the final status, message, task_id, and final_state.
        """
        if self.runner and not self.runner.done():
            logger.warning("Agent is already running. Please stop the current task first.")
            return {"status": "error", "message": "Agent already running.", "task_id": self.current_task_id}

        self.current_task_id = task_id if task_id else str(uuid.uuid4())
        output_dir = None

        if save_dir:
            output_dir = os.path.join(save_dir, self.current_task_id)
            os.makedirs(output_dir, exist_ok=True)
            logger.info(f"[AsyncGen] Output directory: {output_dir}")
        else:
            logger.info(f"[AsyncGen] Running in memory-only mode (no save_dir provided)")

        logger.info(f"[AsyncGen] Starting research task ID: {self.current_task_id} for topic: '{topic}'")

        self.stop_event = threading.Event()
        _AGENT_STOP_FLAGS[self.current_task_id] = self.stop_event
        agent_tools = await self._setup_tools(self.current_task_id, self.stop_event, max_parallel_browsers)
        initial_state: DeepResearchState = {
            "task_id": self.current_task_id,
            "topic": topic,
            "research_plan": [],
            "search_results": [],
            "messages": [],
            "llm": self.llm,
            "tools": agent_tools,
            "output_dir": output_dir,
            "browser_config": self.browser_config,
            "final_report": None,
            "current_step_index": 0,
            "stop_requested": False,
            "error_message": None,
        }

        loaded_state = {}
        if task_id and output_dir:
            # Only try to resume from files if we have a task_id and output_dir
            logger.info(f"Attempting to resume task {task_id}...")
            loaded_state = _load_previous_state(task_id, output_dir)
            initial_state.update(loaded_state)
            if loaded_state.get("research_plan"):
                logger.info(
                    f"Resuming with {len(loaded_state['research_plan'])} plan steps and {len(loaded_state.get('search_results', []))} existing results.")
                initial_state[
                    "topic"] = topic  # Allow overriding topic even when resuming? Or use stored topic? Let's use new one.
            else:
                logger.warning(f"Resume requested for {task_id}, but no previous plan found. Starting fresh.")
                initial_state["current_step_index"] = 0

        # --- Execute Graph using ainvoke ---
        final_state = None
        status = "unknown"
        message = None
        try:
            logger.info(f"Invoking graph execution for task {self.current_task_id}...")
            self.runner = asyncio.create_task(self.graph.ainvoke(initial_state))
            final_state = await self.runner
            logger.info(f"Graph execution finished for task {self.current_task_id}.")

            # Determine status based on final state
            if self.stop_event and self.stop_event.is_set():
                status = "stopped"
                message = "Research process was stopped by request."
                logger.info(message)
            elif final_state and final_state.get("error_message"):
                status = "error"
                message = final_state["error_message"]
                logger.error(f"Graph execution completed with error: {message}")
            elif final_state and final_state.get("final_report"):
                status = "completed"
                message = "Research process completed successfully."
                logger.info(message)
            else:
                # If it ends without error/report (e.g., empty plan, stopped before synthesis)
                status = "finished_incomplete"
                message = "Research process finished, but may be incomplete (no final report generated)."
                logger.warning(message)

        except asyncio.CancelledError:
            status = "cancelled"
            message = f"Agent run task cancelled for {self.current_task_id}."
            logger.info(message)
            # final_state will remain None or the state before cancellation if checkpointing was used
        except Exception as e:
            status = "error"
            message = f"Unhandled error during graph execution for {self.current_task_id}: {e}"
            logger.error(message, exc_info=True)
            # final_state will remain None or the state before the error
        finally:
            logger.info(f"Cleaning up resources for task {self.current_task_id}")
            task_id_to_clean = self.current_task_id

            self.stop_event = None
            self.current_task_id = None
            self.runner = None  # Mark runner as finished
            if self.mcp_client:
                await self.mcp_client.__aexit__(None, None, None)

            # Construct result with report_file_path if available
            result = {
                "status": status,
                "message": message,
                "task_id": task_id_to_clean,  # Use the stored task_id
                "final_state": final_state if final_state else {}  # Return the final state dict
            }

            # Add report file path if we have an output directory and a final report was generated
            if output_dir and final_state and final_state.get("final_report"):
                report_path = os.path.join(output_dir, REPORT_FILENAME)
                result["report_file_path"] = report_path

            return result

    async def _stop_lingering_browsers(self, task_id):
        """Attempts to stop any BrowserUseAgent instances associated with the task_id."""
        keys_to_stop = [key for key in _BROWSER_AGENT_INSTANCES if key.startswith(f"{task_id}_")]
        if not keys_to_stop:
            return

        logger.warning(
            f"Found {len(keys_to_stop)} potentially lingering browser agents for task {task_id}. Attempting stop...")
        for key in keys_to_stop:
            agent_instance = _BROWSER_AGENT_INSTANCES.get(key)
            try:
                if agent_instance:
                    # Assuming BU agent has an async stop method
                    await agent_instance.stop()
                    logger.info(f"Called stop() on browser agent instance {key}")
            except Exception as e:
                logger.error(f"Error calling stop() on browser agent instance {key}: {e}")

    async def stop(self):
        """Signals the currently running agent task to stop."""
        if not self.current_task_id or not self.stop_event:
            logger.info("No agent task is currently running.")
            return

        logger.info(f"Stop requested for task ID: {self.current_task_id}")
        self.stop_event.set()  # Signal the stop event
        self.stopped = True
        await self._stop_lingering_browsers(self.current_task_id)

    def close(self):
        self.stopped = False

```