乐于分享
好东西不私藏

第 5 篇:浏览器自动化工具源码分析 – 掌控 Web 的瑞士军刀

第 5 篇:浏览器自动化工具源码分析 – 掌控 Web 的瑞士军刀

本文是 OpenManus 源码解读系列的第五篇,将深入剖析 BrowserUseTool 的实现细节,理解如何通过 Playwright 控制浏览器完成 30+ 种 Web 自动化操作。

📑 文章导读

在前四篇中,我们学习了 OpenManus 的架构、Agent、工具系统和 MCP 协议。本文将聚焦最强大的工具之一:BrowserUseTool,它是 OpenManus 与 Web 世界交互的核心。

你将学到:

  • • Playwright 集成和浏览器上下文管理
  • • 30+ 种浏览器操作的实现原理
  • • 页面内容提取与 LLM 结合
  • • 截图捕获和 Base64 编码
  • • 表单填充、多标签页、滚动等复杂场景
  • • 浏览器安全的最佳实践

🌟 为什么浏览器自动化如此重要?

在 AI Agent 时代,浏览器自动化是连接 AI 与真实世界的关键桥梁。

传统爬虫 vs 浏览器自动化

特性
传统爬虫 (requests + BeautifulSoup)
浏览器自动化 (Playwright)
JavaScript 渲染
❌ 不支持
✅ 完全支持
交互式操作
❌ 只读
✅ 点击、输入、滚动
验证码处理
❌ 困难
✅ 截图 + LLM 识别
登录/认证
❌ 需要手动处理 Cookie
✅ 自动保存 Session
动态加载
❌ 需要分析 API
✅ 等待元素加载
反爬虫
容易被检测
更难检测(模拟真实用户)

BrowserUseTool 的能力

BrowserUseTool 能够自动化完成:

  • • 🌐 页面导航(跳转、后退、刷新)
  • • 👆 元素交互(点击、输入、悬停)
  • • 📜 页面滚动(上/下/到文本)
  • • 📸 截图捕获(全屏、元素)
  • • 📝 表单填充(文本框、下拉菜单、复选框)
  • • 📑 标签页管理(创建、切换、关闭)
  • • 🔍 内容提取(文本、链接、图片)
  • • 🎯 元素定位(CSS Selector、XPath、文本)

一句话:它能像真人一样使用浏览器!

🏗️ BrowserUseTool 架构

位置:app/tool/browser_use_tool.py

class BrowserUseTool(BaseTool, Generic[Context]):    """    A powerful browser automation tool that allows interaction with web pages through various actions.    * This tool provides commands for controlling a browser session, navigating web pages, and extracting information    * It maintains state across calls, keeping the browser session alive until explicitly closed    * Use this when you need to browse websites, fill forms, click buttons, extract content, or perform web searches    * Each action requires specific parameters as defined in the tool's dependencies    Key capabilities include:    * Navigation: Go to specific URLs, go back, search the web, or refresh pages    * Interaction: Click elements, input text, select from dropdowns, send keyboard commands    * Scrolling: Scroll up/down by pixel amount or scroll to specific text    * Content extraction: Extract and analyze content from web pages based on specific goals    * Tab management: Switch between tabs, open new tabs, or close tabs    """

核心依赖

OpenManus 使用 browser_use 库而非直接使用 Playwright:

from browser_use import Browser as BrowserUseBrowserfrom browser_use import BrowserConfigfrom browser_use.browser.context import BrowserContext, BrowserContextConfigfrom browser_use.dom.service import DomService

核心属性

class BrowserUseTool(BaseTool, Generic[Context]):    name: str = "browser_use"    description: str = _BROWSER_DESCRIPTION    lock: asyncio.Lock = Field(default_factory=asyncio.Lock)    browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True)    context: Optional[BrowserContext] = Field(default=None, exclude=True)    dom_service: Optional[DomService] = Field(default=None, exclude=True)    web_search_tool: WebSearch = Field(default_factory=WebSearch, exclude=True)    # Context for generic functionality    tool_context: Optional[Context] = Field(default=None, exclude=True)    llm: Optional[LLM] = Field(default_factory=LLM)

浏览器初始化

class BrowserUseTool(BaseTool, Generic[Context]):    async def _ensure_browser_initialized(self) -> BrowserContext:        """Ensure browser and context are initialized."""        if self.browser is None:            browser_config_kwargs = {"headless": False, "disable_security": True}            if config.browser_config:                from browser_use.browser.browser import ProxySettings                # handle proxy settings.                if config.browser_config.proxy and config.browser_config.proxy.server:                    browser_config_kwargs["proxy"] = ProxySettings(                        server=config.browser_config.proxy.server,                        username=config.browser_config.proxy.username,                        password=config.browser_config.proxy.password,                    )                browser_attrs = [                    "headless",                    "disable_security",                    "extra_chromium_args",                    "chrome_instance_path",                    "wss_url",                    "cdp_url",                ]                for attr in browser_attrs:                    value = getattr(config.browser_config, attr, None)                    if value is not None:                        if not isinstance(value, list) or value:                            browser_config_kwargs[attr] = value            self.browser = BrowserUseBrowser(BrowserConfig(**browser_config_kwargs))        if self.context is None:            context_config = BrowserContextConfig()            # if there is context config in the config, use it.            if (                config.browser_config                and hasattr(config.browser_config, "new_context_config")                and config.browser_config.new_context_config            ):                context_config = config.browser_config.new_context_config            self.context = await self.browser.new_context(context_config)            self.dom_service = DomService(await self.context.get_current_page())        return self.context

关键点

  • • 单例模式:浏览器实例只创建一次,复用提高效率
  • • 上下文隔离:每个 Context 是独立的 Cookie、LocalStorage 环境
  • • 灵活配置:支持无头模式、远程连接、代理、安全设置

🎯 浏览器操作详解

BrowserUseTool 支持多种浏览器操作,我们来看看核心实现:

1. 页面导航

1.1 访问 URL

class BrowserUseTool(BaseTool, Generic[Context]):    async def execute(        self,        action: str,        url: Optional[str] = None,        index: Optional[int] = None,        text: Optional[str] = None,        scroll_amount: Optional[int] = None,        tab_id: Optional[int] = None,        query: Optional[str] = None,        goal: Optional[str] = None,        keys: Optional[str] = None,        seconds: Optional[int] = None,        **kwargs,) -> ToolResult:        """Execute a specified browser action."""        async with self.lock:            try:                context = await self._ensure_browser_initialized()                # Navigation actions                if action == "go_to_url":                    if not url:                        return ToolResult(                            error="URL is required for 'go_to_url' action"                        )                    page = await context.get_current_page()                    await page.goto(url)                    await page.wait_for_load_state()                    return ToolResult(output=f"Navigated to {url}")

Playwright 的自动等待

  • • wait_until="load":等待 window.load 事件
  • • wait_until="domcontentloaded":等待 DOMContentLoaded 事件
  • • wait_until="networkidle":等待 500ms 内没有网络连接

1.2 页面后退/刷新

                elif action == "go_back":                    await context.go_back()                    return ToolResult(output="Navigated back")                elif action == "refresh":                    await context.refresh_page()                    return ToolResult(output="Refreshed current page")

2. 元素交互

2.1 点击元素

async def _click(    self,    selector: Optional[str] = None,    text: Optional[str] = None,    xpath: Optional[str] = None,    timeout: int = 30000) -> ToolResult:    """    点击页面元素    支持多种定位方式:    - selector: CSS 选择器(例如: "#button", ".class")    - text: 文本匹配(例如: "提交")    - xpath: XPath 表达式    Playwright 会自动:    1. 等待元素可见    2. 等待元素可点击    3. 滚动到元素位置    4. 执行点击    """    try:        # 定位元素        element = await self._find_element(            selector=selector,            text=text,            xpath=xpath,            timeout=timeout        )        if not element:            return ToolResult(error="Element not found")        # 点击元素        await element.click()        # 等待页面稳定        await self._page.wait_for_load_state("networkidle")        return ToolResult(output="Element clicked successfully")    except Exception as e:        return ToolResult(error=f"Failed to click: {e}")async def _find_element(    self,    selector: Optional[str] = None,    text: Optional[str] = None,    xpath: Optional[str] = None,    timeout: int = 30000) -> Optional[Any]:    """定位页面元素"""    if selector:        # CSS 选择器        return await self._page.wait_for_selector(            selector,            timeout=timeout        )    elif text:        # 文本匹配(使用 XPath)        return await self._page.wait_for_selector(            f"text={text}",            timeout=timeout        )    elif xpath:        # XPath        return await self._page.wait_for_selector(            f"xpath={xpath}",            timeout=timeout        )    return None

关键点

  • • 自动等待:Playwright 自动等待元素可点击
  • • 多种定位:支持 CSS、文本、XPath
  • • 智能滚动:如果元素在可视区域外,自动滚动

2.2 输入文本

async def _input_text(    self,    text: str,    selector: Optional[str] = None,    xpath: Optional[str] = None,    clear: bool = True,    timeout: int = 30000) -> ToolResult:    """    在输入框中输入文本    Args:        text: 要输入的文本        selector: CSS 选择器        xpath: XPath 表达式        clear: 是否先清空输入框        timeout: 超时时间    """    try:        # 定位输入框        element = await self._find_element(            selector=selector,            xpath=xpath,            timeout=timeout        )        if not element:            return ToolResult(error="Input element not found")        # 清空输入框        if clear:            await element.clear()        # 输入文本        await element.type(text)        return ToolResult(output=f"Text entered: {text}")    except Exception as e:        return ToolResult(error=f"Failed to input text: {e}")

2.3 选择下拉菜单

async def _select_option(    self,    selector: str,    value: Optional[str] = None,    label: Optional[str] = None,    index: Optional[int] = None,    timeout: int = 30000) -> ToolResult:    """    选择下拉菜单选项    Args:        selector: CSS 选择器(必须)        value: 选项的 value 属性        label: 选项的显示文本        index: 选项的索引    """    try:        # 定位下拉菜单        element = await self._find_element(            selector=selector,            timeout=timeout        )        if not element:            return ToolResult(error="Select element not found")        # 选择选项        if value:            await element.select_option(value=value)        elif label:            await element.select_option(label=label)        elif index is not None:            await element.select_option(index=index)        return ToolResult(output="Option selected successfully")    except Exception as e:        return ToolResult(error=f"Failed to select option: {e}")

3. 页面滚动

3.1 上下滚动

async def _scroll_up(self, amount: Optional[int] = None) -> ToolResult:    """向上滚动"""    try:        if amount:            # 滚动指定像素            await self._page.evaluate(                f"window.scrollBy(0, -{amount})"            )        else:            # 滚动一屏            viewport_height = await self._page.evaluate(                "window.innerHeight"            )            await self._page.evaluate(                f"window.scrollBy(0, -{viewport_height * 0.8})"            )        return ToolResult(output="Scrolled up")    except Exception as e:        return ToolResult(error=f"Failed to scroll up: {e}")async def _scroll_down(self, amount: Optional[int] = None) -> ToolResult:    """向下滚动"""    try:        if amount:            await self._page.evaluate(                f"window.scrollBy(0, {amount})"            )        else:            viewport_height = await self._page.evaluate(                "window.innerHeight"            )            await self._page.evaluate(                f"window.scrollBy(0, {viewport_height * 0.8})"            )        return ToolResult(output="Scrolled down")    except Exception as e:        return ToolResult(error=f"Failed to scroll down: {e}")

3.2 滚动到文本

async def _scroll_to_text(    self,    text: str,    timeout: int = 30000) -> ToolResult:    """    滚动到包含指定文本的元素    实现原理:    1. 使用 JavaScript 查找包含文本的元素    2. 滚动到元素位置    3. 高亮显示元素(可选)    """    try:        # 查找文本元素        element = await self._page.wait_for_selector(            f"text={text}",            timeout=timeout        )        if not element:            return ToolResult(error=f"Text not found: {text}")        # 滚动到元素        await element.scroll_into_view_if_needed()        return ToolResult(output=f"Scrolled to text: {text}")    except Exception as e:        return ToolResult(error=f"Failed to scroll to text: {e}")

4. 截图功能

4.1 全屏截图

async def _screenshot(    self,    path: Optional[str] = None,    full_page: bool = True) -> ToolResult:    """    截取页面截图    Args:        path: 保存路径(可选,不传则返回 base64)        full_page: 是否完整页面截图(包括滚动区域)    Returns:        ToolResult with base64_image    """    try:        # 截图        screenshot = await self._page.screenshot(            path=path,            full_page=full_page        )        logger.info(            f"Screenshot taken{' and saved to ' + path if path else ''}"        )        # 转换为 base64(如果没有保存到文件)        if not path:            base64_image = base64.b64encode(screenshot).decode()        else:            # 从文件读取            with open(path, "rb") as f:                base64_image = base64.b64encode(f.read()).decode()        return ToolResult(            output=f"Screenshot captured{' and saved' if path else ''}",            base64_image=base64_image        )    except Exception as e:        return ToolResult(error=f"Failed to take screenshot: {e}")

4.2 元素截图

async def _screenshot_element(    self,    selector: str,    path: Optional[str] = None) -> ToolResult:    """    截取特定元素的截图    Args:        selector: CSS 选择器        path: 保存路径(可选)    """    try:        # 定位元素        element = await self._page.wait_for_selector(selector)        if not element:            return ToolResult(error=f"Element not found: {selector}")        # 截图元素        screenshot = await element.screenshot(path=path)        if not path:            base64_image = base64.b64encode(screenshot).decode()        else:            with open(path, "rb") as f:                base64_image = base64.b64encode(f.read()).decode()        return ToolResult(            output=f"Element screenshot captured",            base64_image=base64_image        )    except Exception as e:        return ToolResult(error=f"Failed to screenshot element: {e}")

5. 内容提取

5.1 提取页面文本

async def _extract_text(self) -> ToolResult:    """    提取页面上的所有文本内容    实现原理:    使用 JavaScript 遍历 DOM,提取可见文本    """    try:        # 使用 JavaScript 提取文本        text = await self._page.evaluate("""            () => {                // 获取 body 文本                const text = document.body.innerText;                // 清理多余空白                return text.replace(/\\s+/g, ' ').trim();            }        """)        # 如果文本太长,进行截断        if len(text) > self.config.max_text_length:            text = text[:self.config.max_text_length] + "... (truncated)"        return ToolResult(output=text)    except Exception as e:        return ToolResult(error=f"Failed to extract text: {e}")

5.2 提取链接

async def _extract_links(self) -> ToolResult:    """    提取页面上的所有链接    返回格式:    - URL    - 链接文本    """    try:        links = await self._page.evaluate("""            () => {                const links = [];                document.querySelectorAll('a').forEach(a => {                    const href = a.href;                    const text = a.innerText.trim();                    if (href && text) {                        links.push({                            url: href,                            text: text.substring(0, 100)  // 限制文本长度                        });                    }                });                return links;            }        """)        # 格式化输出        result = []        for link in links[:self.config.max_links]:  # 限制链接数量            result.append(f"{link['text']} → {link['url']}")        return ToolResult(output="\n".join(result))    except Exception as e:        return ToolResult(error=f"Failed to extract links: {e}")

5.3 提取图片

async def _extract_images(self) -> ToolResult:    """    提取页面上的所有图片 URL    """    try:        images = await self._page.evaluate("""            () => {                const images = [];                document.querySelectorAll('img').forEach(img => {                    const src = img.src;                    const alt = img.alt || 'No description';                    if (src) {                        images.push({                            url: src,                            alt: alt.substring(0, 100)                        });                    }                });                return images;            }        """)        # 格式化输出        result = []        for img in images[:self.config.max_images]:            result.append(f"{img['alt']} → {img['url']}")        return ToolResult(output="\n".join(result))    except Exception as e:        return ToolResult(error=f"Failed to extract images: {e}")

6. 表单操作

6.1 完整表单填充

async def _fill_form(    self,    form_selector: str,    data: Dict[str, Any],    submit: bool = True) -> ToolResult:    """    填充表单并提交    Args:        form_selector: 表单 CSS 选择器        data: 表单数据(字段名: 值)        submit: 是否提交表单    """    try:        # 遍历表单字段        for field_name, value in data.items():            # 构造字段选择器            field_selector = f"{form_selector} [name='{field_name}']"            # 判断字段类型            field_type = await self._get_field_type(field_selector)            if field_type == "select":                # 下拉菜单                await self._select_option(                    selector=field_selector,                    value=value                )            elif field_type == "checkbox":                # 复选框                if value:                    await self._check(field_selector)                else:                    await self._uncheck(field_selector)            else:                # 文本输入框                await self._input_text(                    text=str(value),                    selector=field_selector                )        # 提交表单        if submit:            await self._click(selector=f"{form_selector} [type='submit']")        return ToolResult(output="Form filled and submitted")    except Exception as e:        return ToolResult(error=f"Failed to fill form: {e}")async def _get_field_type(self, selector: str) -> str:    """获取表单字段类型"""    element = await self._page.query_selector(selector)    if not element:        return "text"    tag_name = await element.evaluate("el => el.tagName")    type_attr = await element.get_attribute("type") or ""    if tag_name == "SELECT":        return "select"    elif type_attr.lower() == "checkbox":        return "checkbox"    else:        return "text"

7. 标签页管理

7.1 获取所有标签页

async def _get_tabs(self) -> ToolResult:    """    获取所有打开的标签页信息    """    try:        tabs = []        for i, page in enumerate(self._context.pages):            tabs.append({                "index": i,                "title": await page.title(),                "url": page.url,                "active": page == self._page            })        # 格式化输出        result = []        for tab in tabs:            active_marker = "▶ " if tab["active"] else "  "            result.append(f"{active_marker}{tab['index']}: {tab['title']} ({tab['url']})")        return ToolResult(output="\n".join(result))    except Exception as e:        return ToolResult(error=f"Failed to get tabs: {e}")

7.2 切换标签页

async def _switch_tab(self, index: int) -> ToolResult:    """    切换到指定索引的标签页    """    try:        pages = self._context.pages        if index < 0 or index >= len(pages):            return ToolResult(error=f"Invalid tab index: {index}")        # 切换到指定标签页        self._page = pages[index]        return ToolResult(            output=f"Switched to tab {index}: {await self._page.title()}"        )    except Exception as e:        return ToolResult(error=f"Failed to switch tab: {e}")

7.3 打开新标签页

async def _open_new_tab(self) -> ToolResult:    """    打开新标签页    """    try:        # 创建新页面        self._page = await self._context.new_page()        return ToolResult(output="New tab opened")    except Exception as e:        return ToolResult(error=f"Failed to open new tab: {e}")

8. 网络请求拦截

async def _intercept_requests(self, patterns: List[str]) -> ToolResult:    """    拦截匹配的网络请求    Args:        patterns: URL 匹配模式(例如: "**/*.jpg", "**/api/**")    Returns:        拦截的请求列表    """    try:        intercepted = []        # 设置拦截器        async def handle_request(route, request):            # 检查是否匹配模式            for pattern in patterns:                if self._match_pattern(request.url, pattern):                    intercepted.append({                        "url": request.url,                        "method": request.method,                        "headers": dict(request.headers)                    })            # 继续请求(或可以中断)            await route.continue_()        # 启用拦截        await self._page.route("**/*", handle_request)        # 等待一段时间收集请求        await asyncio.sleep(2)        # 格式化输出        result = []        for req in intercepted:            result.append(f"{req['method']} {req['url']}")        return ToolResult(output="\n".join(result))    except Exception as e:        return ToolResult(error=f"Failed to intercept requests: {e}")

🧠 智能内容提取:Playwright + LLM

OpenManus 最强大的功能之一是:用 LLM 理解页面内容

场景:提取商品信息

传统方法(XPath):

# 硬编码 XPath,页面一改就失效name = page.query_selector("//div[@class='product-title']/h1").textprice = page.query_selector("//span[@class='price']").text

OpenManus 方法(Playwright + LLM):

# 1. 截图页面screenshot = await page.screenshot()base64_image = base64.b64encode(screenshot).decode()# 2. 发送给 LLMllm_response = await llm.complete(    messages=[        {            "role": "user",            "content": [                {"type": "text", "text": "Extract product info from this image"},                {"type": "image_url", "image_url": f"data:image/png;base64,{base64_image}"}            ]        }    ])# 3. LLM 返回结构化数据# {#   "name": "iPhone 15 Pro",#   "price": "$999",#   "description": "Latest Apple flagship...",#   "rating": 4.8# }

实现:OpenManus 的搜索 + 提取 + 总结

async def search_and_extract(    agent: Manus,    query: str) -> str:    """    搜索并提取关键信息    流程:    1. 搜索关键词    2. 访问结果页面    3. 截图页面    4. LLM 提取信息    5. 总结    """    # 1. 搜索    search_result = await agent.run(f"搜索: {query}")    # 2. 访问第一个结果    browser = agent.available_tools.get_tool("browser_use_tool")    # 假设搜索结果包含 URL    urls = extract_urls(search_result)    if not urls:        return "No results found"    # 3. 访问页面    await browser.goto(urls[0])    # 4. 截图    screenshot_result = await browser.screenshot()    # 5. 使用 LLM 提取信息    extract_prompt = f"""    从这张截图中提取以下信息:    - 标题    - 关键点(3-5个)    - 总结(一段文字)    以 JSON 格式返回。    """    extraction = await agent.llm.complete(        messages=[            {"role": "user", "content": extract_prompt},            {                "role": "user",                "content": {                    "type": "image_url",                    "image_url": f"data:image/png;base64,{screenshot_result.base64_image}"                }            }        ]    )    return extraction

🔒 安全最佳实践

1. 内容过滤(防止 XSS)

import redef sanitize_html(html: str) -> str:    """清理 HTML,移除危险标签"""    # 移除 script 标签    html = re.sub(r'<script.*?</script>', '', html, flags=re.DOTALL)    # 移除事件处理器    html = re.sub(r'\son\w+="[^"]*"', '', html)    html = re.sub(r"\son\w+='[^']*'", '', html)    return html# 在提取内容前清理html = await page.content()safe_html = sanitize_html(html)

2. URL 白名单

class SecureBrowserUseTool(BrowserUseTool):    ALLOWED_DOMAINS = [        "github.com",        "stackoverflow.com",        "wikipedia.org"    ]    async def _goto(self, url: str) -> ToolResult:        # 验证域名        from urllib.parse import urlparse        domain = urlparse(url).netloc        if domain not in self.ALLOWED_DOMAINS:            return ToolResult(                error=f"Domain not allowed: {domain}"            )        return await super()._goto(url)

3. 资源限制

# 限制浏览器资源使用context_options = {    # 限制每个标签页的内存使用    "viewport": {"width": 1920, "height": 1080},    # 禁用部分资源(图片、CSS)    "blocked_resources": ["image", "stylesheet", "font", "media"],    # 超时设置    "timeout": 30000,}

4. 沙箱模式

# 使用 Docker 运行浏览器browser = await self._playwright.chromium.launch(    executable_path="/usr/bin/google-chrome",    args=[        "--no-sandbox",  # 在 Docker 中运行需要        "--disable-setuid-sandbox",        "--disable-gpu",        "--disable-dev-shm-usage",    ])

📊 性能优化

1. 浏览器复用

class BrowserUseTool(BaseTool):    # 类变量,全局复用    _playwright = None    _browser = None    _context = None    async def setup(self):        """只创建一次浏览器实例"""        if not BrowserUseTool._playwright:            BrowserUseTool._playwright = await async_playwright().start()        if not BrowserUseTool._browser:            BrowserUseTool._browser = await self._playwright.chromium.launch()        # 每个实例有自己的 Context 和 Page        if not self._context:            self._context = await self._browser.new_context()        if not self._page:            self._page = await self._context.new_page()

2. 并行操作

# 同时截图多个元素async def screenshot_multiple(self, selectors: List[str]) -> List[bytes]:    tasks = [        self._page.locator(selector).screenshot()        for selector in selectors    ]    # 并行执行    screenshots = await asyncio.gather(*tasks)    return screenshots

3. 网络拦截

# 禁用图片加载,提升速度await self._page.route("**/*.png", lambda route: route.abort())await self._page.route("**/*.jpg", lambda route: route.abort())await self._page.route("**/*.gif", lambda route: route.abort())# 只加载 HTML 和 API 请求await self._page.route("**/*", lambda route: route.continue_())

4. 内容缓存

from functools import lru_cacheclass CachedBrowserUseTool(BrowserUseTool):    @lru_cache(maxsize=100)    async def _goto(self, url: str) -> ToolResult:        """缓存页面访问结果"""        return await super()._goto(url)

🔧 调试技巧

1. 非无头模式调试

# 配置可见浏览器config = BrowserConfig(    headless=False,  # 显示浏览器界面    slow_mo=500,     # 每个操作延迟 500ms(方便观察))tool = BrowserUseTool(config)

2. 视频录制

# 录制整个操作过程context = await browser.new_context(    record_video_dir="videos/",    record_video_size={"width": 1920, "height": 1080})# 操作...# 保存视频await context.close()

3. 实时日志

# 监听所有网络请求page.on("request", lambda request: print(f">> {request.method} {request.url}"))page.on("response", lambda response: print(f"<< {response.status} {response.url}"))# 监听控制台输出page.on("console", lambda msg: print(f"PAGE LOG: {msg.text}"))

4. 保存页面状态

# 保存 HTMLhtml = await page.content()with open("page.html", "w") as f:    f.write(html)# 保存截图await page.screenshot(path="page.png")# 保存 HAR(HTTP Archive,所有网络请求)await page.route("**", save_request)

🎓 最佳实践总结

✅ 应该做的

  1. 1. 复用浏览器实例:避免重复启动
  2. 2. 使用合适的选择器:优先使用 CSS 选择器,文本次之,XPath 最后
  3. 3. 添加超时:防止无限等待
  4. 4. 错误处理:每个操作都捕获异常
  5. 5. 清理资源:操作完成后关闭浏览器
  6. 6. 使用 LLM 辅助:截图 + LLM 理解页面内容
  7. 7. 添加等待:等待元素加载完成
  8. 8. 限制截图大小:避免返回过大的 base64 图片

❌ 不应该做的

  1. 1. 频繁启动浏览器:性能开销大
  2. 2. 硬编码 XPath:页面一改就失效
  3. 3. 无限等待:没有超时设置
  4. 4. 忽略错误:不处理异常
  5. 5. 不清理资源:导致内存泄漏
  6. 6. 截图不压缩:传输大量数据
  7. 7. 不使用自动等待:等待元素加载
  8. 8. 忽略浏览器安全:XSS、CSRF

📚 总结

本文深入剖析了 BrowserUseTool 的实现:

核心能力

  1. 1. 30+ 浏览器操作:导航、交互、滚动、截图、提取、表单、标签页
  2. 2. Playwright 集成:强大的浏览器自动化库
  3. 3. 智能等待:自动等待元素加载和可点击
  4. 4. 多定位方式:CSS 选择器、文本、XPath
  5. 5. 内容提取:文本、链接、图片
  6. 6. 截图功能:全屏、元素、Base64 编码

架构设计

BrowserUseTool    ↓Playwright (Browser、Context、Page)    ↓Chromium / Firefox / WebKit

关键技术

  • • 自动等待:无需手动 sleep
  • • 上下文隔离:独立 Cookie、Session
  • • 智能滚动:自动滚动到元素
  • • 截图 + LLM:AI 理解页面内容
  • • Base64 编码:图片嵌入消息

应用场景

  1. 1. 自动化测试:代替 Selenium
  2. 2. 数据抓取:动态页面爬虫
  3. 3. 表单填写:自动提交
  4. 4. 监控:页面变化检测
  5. 5. AI Agent:浏览网页、收集信息

上篇文章回顾

第 4 篇:《MCP协议集成详解》

本文是 OpenManus 源码解读系列的第四篇,将深入剖析 MCP (Model Context Protocol) 协议的集成机制,理解如何通过 MCP 实现工具能力的无限扩展。。

第 3 篇:《工具系统的设计哲学 – 打造可扩展的 AI 工具生态》

本文是 OpenManus 源码解读系列的第三篇,将深入剖析工具系统的设计哲学,理解 BaseTool、ToolResult、ToolCollection 等核心组件,以及动态参数解析的魔法

第 2 篇:《Agent架构深度剖析》

本文是 OpenManus 源码解读系列的第二篇,将深入剖析 Agent 架构的每一层设计,理解状态管理、消息循环、ReAct 模式等核心机制。

第 1 篇:《探秘 OpenManus:从源码角度解读开源 AI Agent 框架》

本文是 OpenManus 源码解读系列的第一篇,将从源码架构角度带你深入理解这个强大的 AI Agent 框架。

🎯 下篇文章预告

第 6 篇:《代码执行的安全沙箱》

我们将深入 PythonExecute 工具,学习:

  • • 进程隔离的实现(multiprocessing)
  • • 超时控制(asyncio.wait_for)
  • • 输出捕获(stdout/stderr)
  • • Docker 沙箱详解
  • • 资源限制(CPU、内存)
  • • 安全最佳实践

敬请期待!


本文关键词#OpenManus #浏览器自动化 #Playwright #网页抓取 #截图 #内容提取 #Web自动化 #AIAgent

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » 第 5 篇:浏览器自动化工具源码分析 – 掌控 Web 的瑞士军刀

评论 抢沙发

7 + 4 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮