第 5 篇:浏览器自动化工具源码分析 – 掌控 Web 的瑞士军刀
本文是 OpenManus 源码解读系列的第五篇,将深入剖析 BrowserUseTool 的实现细节,理解如何通过 Playwright 控制浏览器完成 30+ 种 Web 自动化操作。
📑 文章导读
在前四篇中,我们学习了 OpenManus 的架构、Agent、工具系统和 MCP 协议。本文将聚焦最强大的工具之一:BrowserUseTool,它是 OpenManus 与 Web 世界交互的核心。
你将学到:
-
• Playwright 集成和浏览器上下文管理 -
• 30+ 种浏览器操作的实现原理 -
• 页面内容提取与 LLM 结合 -
• 截图捕获和 Base64 编码 -
• 表单填充、多标签页、滚动等复杂场景 -
• 浏览器安全的最佳实践
🌟 为什么浏览器自动化如此重要?
在 AI Agent 时代,浏览器自动化是连接 AI 与真实世界的关键桥梁。
传统爬虫 vs 浏览器自动化
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BrowserUseTool 的能力
BrowserUseTool 能够自动化完成:
-
• 🌐 页面导航(跳转、后退、刷新) -
• 👆 元素交互(点击、输入、悬停) -
• 📜 页面滚动(上/下/到文本) -
• 📸 截图捕获(全屏、元素) -
• 📝 表单填充(文本框、下拉菜单、复选框) -
• 📑 标签页管理(创建、切换、关闭) -
• 🔍 内容提取(文本、链接、图片) -
• 🎯 元素定位(CSS Selector、XPath、文本)
一句话:它能像真人一样使用浏览器!
🏗️ BrowserUseTool 架构
位置:app/tool/browser_use_tool.py
class BrowserUseTool(BaseTool, Generic[Context]): """ A powerful browser automation tool that allows interaction with web pages through various actions. * This tool provides commands for controlling a browser session, navigating web pages, and extracting information * It maintains state across calls, keeping the browser session alive until explicitly closed * Use this when you need to browse websites, fill forms, click buttons, extract content, or perform web searches * Each action requires specific parameters as defined in the tool's dependencies Key capabilities include: * Navigation: Go to specific URLs, go back, search the web, or refresh pages * Interaction: Click elements, input text, select from dropdowns, send keyboard commands * Scrolling: Scroll up/down by pixel amount or scroll to specific text * Content extraction: Extract and analyze content from web pages based on specific goals * Tab management: Switch between tabs, open new tabs, or close tabs """
核心依赖
OpenManus 使用 browser_use 库而非直接使用 Playwright:
from browser_use import Browser as BrowserUseBrowserfrom browser_use import BrowserConfigfrom browser_use.browser.context import BrowserContext, BrowserContextConfigfrom browser_use.dom.service import DomService
核心属性
class BrowserUseTool(BaseTool, Generic[Context]): name: str = "browser_use" description: str = _BROWSER_DESCRIPTION lock: asyncio.Lock = Field(default_factory=asyncio.Lock) browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True) context: Optional[BrowserContext] = Field(default=None, exclude=True) dom_service: Optional[DomService] = Field(default=None, exclude=True) web_search_tool: WebSearch = Field(default_factory=WebSearch, exclude=True) # Context for generic functionality tool_context: Optional[Context] = Field(default=None, exclude=True) llm: Optional[LLM] = Field(default_factory=LLM)
浏览器初始化
class BrowserUseTool(BaseTool, Generic[Context]): async def _ensure_browser_initialized(self) -> BrowserContext: """Ensure browser and context are initialized.""" if self.browser is None: browser_config_kwargs = {"headless": False, "disable_security": True} if config.browser_config: from browser_use.browser.browser import ProxySettings # handle proxy settings. if config.browser_config.proxy and config.browser_config.proxy.server: browser_config_kwargs["proxy"] = ProxySettings( server=config.browser_config.proxy.server, username=config.browser_config.proxy.username, password=config.browser_config.proxy.password, ) browser_attrs = [ "headless", "disable_security", "extra_chromium_args", "chrome_instance_path", "wss_url", "cdp_url", ] for attr in browser_attrs: value = getattr(config.browser_config, attr, None) if value is not None: if not isinstance(value, list) or value: browser_config_kwargs[attr] = value self.browser = BrowserUseBrowser(BrowserConfig(**browser_config_kwargs)) if self.context is None: context_config = BrowserContextConfig() # if there is context config in the config, use it. if ( config.browser_config and hasattr(config.browser_config, "new_context_config") and config.browser_config.new_context_config ): context_config = config.browser_config.new_context_config self.context = await self.browser.new_context(context_config) self.dom_service = DomService(await self.context.get_current_page()) return self.context
关键点:
-
• 单例模式:浏览器实例只创建一次,复用提高效率 -
• 上下文隔离:每个 Context 是独立的 Cookie、LocalStorage 环境 -
• 灵活配置:支持无头模式、远程连接、代理、安全设置
🎯 浏览器操作详解
BrowserUseTool 支持多种浏览器操作,我们来看看核心实现:
1. 页面导航
1.1 访问 URL
class BrowserUseTool(BaseTool, Generic[Context]): async def execute( self, action: str, url: Optional[str] = None, index: Optional[int] = None, text: Optional[str] = None, scroll_amount: Optional[int] = None, tab_id: Optional[int] = None, query: Optional[str] = None, goal: Optional[str] = None, keys: Optional[str] = None, seconds: Optional[int] = None, **kwargs,) -> ToolResult: """Execute a specified browser action.""" async with self.lock: try: context = await self._ensure_browser_initialized() # Navigation actions if action == "go_to_url": if not url: return ToolResult( error="URL is required for 'go_to_url' action" ) page = await context.get_current_page() await page.goto(url) await page.wait_for_load_state() return ToolResult(output=f"Navigated to {url}")
Playwright 的自动等待:
-
• wait_until="load":等待 window.load 事件 -
• wait_until="domcontentloaded":等待 DOMContentLoaded 事件 -
• wait_until="networkidle":等待 500ms 内没有网络连接
1.2 页面后退/刷新
elif action == "go_back": await context.go_back() return ToolResult(output="Navigated back") elif action == "refresh": await context.refresh_page() return ToolResult(output="Refreshed current page")
2. 元素交互
2.1 点击元素
async def _click( self, selector: Optional[str] = None, text: Optional[str] = None, xpath: Optional[str] = None, timeout: int = 30000) -> ToolResult: """ 点击页面元素 支持多种定位方式: - selector: CSS 选择器(例如: "#button", ".class") - text: 文本匹配(例如: "提交") - xpath: XPath 表达式 Playwright 会自动: 1. 等待元素可见 2. 等待元素可点击 3. 滚动到元素位置 4. 执行点击 """ try: # 定位元素 element = await self._find_element( selector=selector, text=text, xpath=xpath, timeout=timeout ) if not element: return ToolResult(error="Element not found") # 点击元素 await element.click() # 等待页面稳定 await self._page.wait_for_load_state("networkidle") return ToolResult(output="Element clicked successfully") except Exception as e: return ToolResult(error=f"Failed to click: {e}")async def _find_element( self, selector: Optional[str] = None, text: Optional[str] = None, xpath: Optional[str] = None, timeout: int = 30000) -> Optional[Any]: """定位页面元素""" if selector: # CSS 选择器 return await self._page.wait_for_selector( selector, timeout=timeout ) elif text: # 文本匹配(使用 XPath) return await self._page.wait_for_selector( f"text={text}", timeout=timeout ) elif xpath: # XPath return await self._page.wait_for_selector( f"xpath={xpath}", timeout=timeout ) return None
关键点:
-
• 自动等待:Playwright 自动等待元素可点击 -
• 多种定位:支持 CSS、文本、XPath -
• 智能滚动:如果元素在可视区域外,自动滚动
2.2 输入文本
async def _input_text( self, text: str, selector: Optional[str] = None, xpath: Optional[str] = None, clear: bool = True, timeout: int = 30000) -> ToolResult: """ 在输入框中输入文本 Args: text: 要输入的文本 selector: CSS 选择器 xpath: XPath 表达式 clear: 是否先清空输入框 timeout: 超时时间 """ try: # 定位输入框 element = await self._find_element( selector=selector, xpath=xpath, timeout=timeout ) if not element: return ToolResult(error="Input element not found") # 清空输入框 if clear: await element.clear() # 输入文本 await element.type(text) return ToolResult(output=f"Text entered: {text}") except Exception as e: return ToolResult(error=f"Failed to input text: {e}")
2.3 选择下拉菜单
async def _select_option( self, selector: str, value: Optional[str] = None, label: Optional[str] = None, index: Optional[int] = None, timeout: int = 30000) -> ToolResult: """ 选择下拉菜单选项 Args: selector: CSS 选择器(必须) value: 选项的 value 属性 label: 选项的显示文本 index: 选项的索引 """ try: # 定位下拉菜单 element = await self._find_element( selector=selector, timeout=timeout ) if not element: return ToolResult(error="Select element not found") # 选择选项 if value: await element.select_option(value=value) elif label: await element.select_option(label=label) elif index is not None: await element.select_option(index=index) return ToolResult(output="Option selected successfully") except Exception as e: return ToolResult(error=f"Failed to select option: {e}")
3. 页面滚动
3.1 上下滚动
async def _scroll_up(self, amount: Optional[int] = None) -> ToolResult: """向上滚动""" try: if amount: # 滚动指定像素 await self._page.evaluate( f"window.scrollBy(0, -{amount})" ) else: # 滚动一屏 viewport_height = await self._page.evaluate( "window.innerHeight" ) await self._page.evaluate( f"window.scrollBy(0, -{viewport_height * 0.8})" ) return ToolResult(output="Scrolled up") except Exception as e: return ToolResult(error=f"Failed to scroll up: {e}")async def _scroll_down(self, amount: Optional[int] = None) -> ToolResult: """向下滚动""" try: if amount: await self._page.evaluate( f"window.scrollBy(0, {amount})" ) else: viewport_height = await self._page.evaluate( "window.innerHeight" ) await self._page.evaluate( f"window.scrollBy(0, {viewport_height * 0.8})" ) return ToolResult(output="Scrolled down") except Exception as e: return ToolResult(error=f"Failed to scroll down: {e}")
3.2 滚动到文本
async def _scroll_to_text( self, text: str, timeout: int = 30000) -> ToolResult: """ 滚动到包含指定文本的元素 实现原理: 1. 使用 JavaScript 查找包含文本的元素 2. 滚动到元素位置 3. 高亮显示元素(可选) """ try: # 查找文本元素 element = await self._page.wait_for_selector( f"text={text}", timeout=timeout ) if not element: return ToolResult(error=f"Text not found: {text}") # 滚动到元素 await element.scroll_into_view_if_needed() return ToolResult(output=f"Scrolled to text: {text}") except Exception as e: return ToolResult(error=f"Failed to scroll to text: {e}")
4. 截图功能
4.1 全屏截图
async def _screenshot( self, path: Optional[str] = None, full_page: bool = True) -> ToolResult: """ 截取页面截图 Args: path: 保存路径(可选,不传则返回 base64) full_page: 是否完整页面截图(包括滚动区域) Returns: ToolResult with base64_image """ try: # 截图 screenshot = await self._page.screenshot( path=path, full_page=full_page ) logger.info( f"Screenshot taken{' and saved to ' + path if path else ''}" ) # 转换为 base64(如果没有保存到文件) if not path: base64_image = base64.b64encode(screenshot).decode() else: # 从文件读取 with open(path, "rb") as f: base64_image = base64.b64encode(f.read()).decode() return ToolResult( output=f"Screenshot captured{' and saved' if path else ''}", base64_image=base64_image ) except Exception as e: return ToolResult(error=f"Failed to take screenshot: {e}")
4.2 元素截图
async def _screenshot_element( self, selector: str, path: Optional[str] = None) -> ToolResult: """ 截取特定元素的截图 Args: selector: CSS 选择器 path: 保存路径(可选) """ try: # 定位元素 element = await self._page.wait_for_selector(selector) if not element: return ToolResult(error=f"Element not found: {selector}") # 截图元素 screenshot = await element.screenshot(path=path) if not path: base64_image = base64.b64encode(screenshot).decode() else: with open(path, "rb") as f: base64_image = base64.b64encode(f.read()).decode() return ToolResult( output=f"Element screenshot captured", base64_image=base64_image ) except Exception as e: return ToolResult(error=f"Failed to screenshot element: {e}")
5. 内容提取
5.1 提取页面文本
async def _extract_text(self) -> ToolResult: """ 提取页面上的所有文本内容 实现原理: 使用 JavaScript 遍历 DOM,提取可见文本 """ try: # 使用 JavaScript 提取文本 text = await self._page.evaluate(""" () => { // 获取 body 文本 const text = document.body.innerText; // 清理多余空白 return text.replace(/\\s+/g, ' ').trim(); } """) # 如果文本太长,进行截断 if len(text) > self.config.max_text_length: text = text[:self.config.max_text_length] + "... (truncated)" return ToolResult(output=text) except Exception as e: return ToolResult(error=f"Failed to extract text: {e}")
5.2 提取链接
async def _extract_links(self) -> ToolResult: """ 提取页面上的所有链接 返回格式: - URL - 链接文本 """ try: links = await self._page.evaluate(""" () => { const links = []; document.querySelectorAll('a').forEach(a => { const href = a.href; const text = a.innerText.trim(); if (href && text) { links.push({ url: href, text: text.substring(0, 100) // 限制文本长度 }); } }); return links; } """) # 格式化输出 result = [] for link in links[:self.config.max_links]: # 限制链接数量 result.append(f"{link['text']} → {link['url']}") return ToolResult(output="\n".join(result)) except Exception as e: return ToolResult(error=f"Failed to extract links: {e}")
5.3 提取图片
async def _extract_images(self) -> ToolResult: """ 提取页面上的所有图片 URL """ try: images = await self._page.evaluate(""" () => { const images = []; document.querySelectorAll('img').forEach(img => { const src = img.src; const alt = img.alt || 'No description'; if (src) { images.push({ url: src, alt: alt.substring(0, 100) }); } }); return images; } """) # 格式化输出 result = [] for img in images[:self.config.max_images]: result.append(f"{img['alt']} → {img['url']}") return ToolResult(output="\n".join(result)) except Exception as e: return ToolResult(error=f"Failed to extract images: {e}")
6. 表单操作
6.1 完整表单填充
async def _fill_form( self, form_selector: str, data: Dict[str, Any], submit: bool = True) -> ToolResult: """ 填充表单并提交 Args: form_selector: 表单 CSS 选择器 data: 表单数据(字段名: 值) submit: 是否提交表单 """ try: # 遍历表单字段 for field_name, value in data.items(): # 构造字段选择器 field_selector = f"{form_selector} [name='{field_name}']" # 判断字段类型 field_type = await self._get_field_type(field_selector) if field_type == "select": # 下拉菜单 await self._select_option( selector=field_selector, value=value ) elif field_type == "checkbox": # 复选框 if value: await self._check(field_selector) else: await self._uncheck(field_selector) else: # 文本输入框 await self._input_text( text=str(value), selector=field_selector ) # 提交表单 if submit: await self._click(selector=f"{form_selector} [type='submit']") return ToolResult(output="Form filled and submitted") except Exception as e: return ToolResult(error=f"Failed to fill form: {e}")async def _get_field_type(self, selector: str) -> str: """获取表单字段类型""" element = await self._page.query_selector(selector) if not element: return "text" tag_name = await element.evaluate("el => el.tagName") type_attr = await element.get_attribute("type") or "" if tag_name == "SELECT": return "select" elif type_attr.lower() == "checkbox": return "checkbox" else: return "text"
7. 标签页管理
7.1 获取所有标签页
async def _get_tabs(self) -> ToolResult: """ 获取所有打开的标签页信息 """ try: tabs = [] for i, page in enumerate(self._context.pages): tabs.append({ "index": i, "title": await page.title(), "url": page.url, "active": page == self._page }) # 格式化输出 result = [] for tab in tabs: active_marker = "▶ " if tab["active"] else " " result.append(f"{active_marker}{tab['index']}: {tab['title']} ({tab['url']})") return ToolResult(output="\n".join(result)) except Exception as e: return ToolResult(error=f"Failed to get tabs: {e}")
7.2 切换标签页
async def _switch_tab(self, index: int) -> ToolResult: """ 切换到指定索引的标签页 """ try: pages = self._context.pages if index < 0 or index >= len(pages): return ToolResult(error=f"Invalid tab index: {index}") # 切换到指定标签页 self._page = pages[index] return ToolResult( output=f"Switched to tab {index}: {await self._page.title()}" ) except Exception as e: return ToolResult(error=f"Failed to switch tab: {e}")
7.3 打开新标签页
async def _open_new_tab(self) -> ToolResult: """ 打开新标签页 """ try: # 创建新页面 self._page = await self._context.new_page() return ToolResult(output="New tab opened") except Exception as e: return ToolResult(error=f"Failed to open new tab: {e}")
8. 网络请求拦截
async def _intercept_requests(self, patterns: List[str]) -> ToolResult: """ 拦截匹配的网络请求 Args: patterns: URL 匹配模式(例如: "**/*.jpg", "**/api/**") Returns: 拦截的请求列表 """ try: intercepted = [] # 设置拦截器 async def handle_request(route, request): # 检查是否匹配模式 for pattern in patterns: if self._match_pattern(request.url, pattern): intercepted.append({ "url": request.url, "method": request.method, "headers": dict(request.headers) }) # 继续请求(或可以中断) await route.continue_() # 启用拦截 await self._page.route("**/*", handle_request) # 等待一段时间收集请求 await asyncio.sleep(2) # 格式化输出 result = [] for req in intercepted: result.append(f"{req['method']} {req['url']}") return ToolResult(output="\n".join(result)) except Exception as e: return ToolResult(error=f"Failed to intercept requests: {e}")
🧠 智能内容提取:Playwright + LLM
OpenManus 最强大的功能之一是:用 LLM 理解页面内容
场景:提取商品信息
传统方法(XPath):
# 硬编码 XPath,页面一改就失效name = page.query_selector("//div[@class='product-title']/h1").textprice = page.query_selector("//span[@class='price']").text
OpenManus 方法(Playwright + LLM):
# 1. 截图页面screenshot = await page.screenshot()base64_image = base64.b64encode(screenshot).decode()# 2. 发送给 LLMllm_response = await llm.complete( messages=[ { "role": "user", "content": [ {"type": "text", "text": "Extract product info from this image"}, {"type": "image_url", "image_url": f"data:image/png;base64,{base64_image}"} ] } ])# 3. LLM 返回结构化数据# {# "name": "iPhone 15 Pro",# "price": "$999",# "description": "Latest Apple flagship...",# "rating": 4.8# }
实现:OpenManus 的搜索 + 提取 + 总结
async def search_and_extract( agent: Manus, query: str) -> str: """ 搜索并提取关键信息 流程: 1. 搜索关键词 2. 访问结果页面 3. 截图页面 4. LLM 提取信息 5. 总结 """ # 1. 搜索 search_result = await agent.run(f"搜索: {query}") # 2. 访问第一个结果 browser = agent.available_tools.get_tool("browser_use_tool") # 假设搜索结果包含 URL urls = extract_urls(search_result) if not urls: return "No results found" # 3. 访问页面 await browser.goto(urls[0]) # 4. 截图 screenshot_result = await browser.screenshot() # 5. 使用 LLM 提取信息 extract_prompt = f""" 从这张截图中提取以下信息: - 标题 - 关键点(3-5个) - 总结(一段文字) 以 JSON 格式返回。 """ extraction = await agent.llm.complete( messages=[ {"role": "user", "content": extract_prompt}, { "role": "user", "content": { "type": "image_url", "image_url": f"data:image/png;base64,{screenshot_result.base64_image}" } } ] ) return extraction
🔒 安全最佳实践
1. 内容过滤(防止 XSS)
import redef sanitize_html(html: str) -> str: """清理 HTML,移除危险标签""" # 移除 script 标签 html = re.sub(r'<script.*?</script>', '', html, flags=re.DOTALL) # 移除事件处理器 html = re.sub(r'\son\w+="[^"]*"', '', html) html = re.sub(r"\son\w+='[^']*'", '', html) return html# 在提取内容前清理html = await page.content()safe_html = sanitize_html(html)
2. URL 白名单
class SecureBrowserUseTool(BrowserUseTool): ALLOWED_DOMAINS = [ "github.com", "stackoverflow.com", "wikipedia.org" ] async def _goto(self, url: str) -> ToolResult: # 验证域名 from urllib.parse import urlparse domain = urlparse(url).netloc if domain not in self.ALLOWED_DOMAINS: return ToolResult( error=f"Domain not allowed: {domain}" ) return await super()._goto(url)
3. 资源限制
# 限制浏览器资源使用context_options = { # 限制每个标签页的内存使用 "viewport": {"width": 1920, "height": 1080}, # 禁用部分资源(图片、CSS) "blocked_resources": ["image", "stylesheet", "font", "media"], # 超时设置 "timeout": 30000,}
4. 沙箱模式
# 使用 Docker 运行浏览器browser = await self._playwright.chromium.launch( executable_path="/usr/bin/google-chrome", args=[ "--no-sandbox", # 在 Docker 中运行需要 "--disable-setuid-sandbox", "--disable-gpu", "--disable-dev-shm-usage", ])
📊 性能优化
1. 浏览器复用
class BrowserUseTool(BaseTool): # 类变量,全局复用 _playwright = None _browser = None _context = None async def setup(self): """只创建一次浏览器实例""" if not BrowserUseTool._playwright: BrowserUseTool._playwright = await async_playwright().start() if not BrowserUseTool._browser: BrowserUseTool._browser = await self._playwright.chromium.launch() # 每个实例有自己的 Context 和 Page if not self._context: self._context = await self._browser.new_context() if not self._page: self._page = await self._context.new_page()
2. 并行操作
# 同时截图多个元素async def screenshot_multiple(self, selectors: List[str]) -> List[bytes]: tasks = [ self._page.locator(selector).screenshot() for selector in selectors ] # 并行执行 screenshots = await asyncio.gather(*tasks) return screenshots
3. 网络拦截
# 禁用图片加载,提升速度await self._page.route("**/*.png", lambda route: route.abort())await self._page.route("**/*.jpg", lambda route: route.abort())await self._page.route("**/*.gif", lambda route: route.abort())# 只加载 HTML 和 API 请求await self._page.route("**/*", lambda route: route.continue_())
4. 内容缓存
from functools import lru_cacheclass CachedBrowserUseTool(BrowserUseTool): @lru_cache(maxsize=100) async def _goto(self, url: str) -> ToolResult: """缓存页面访问结果""" return await super()._goto(url)
🔧 调试技巧
1. 非无头模式调试
# 配置可见浏览器config = BrowserConfig( headless=False, # 显示浏览器界面 slow_mo=500, # 每个操作延迟 500ms(方便观察))tool = BrowserUseTool(config)
2. 视频录制
# 录制整个操作过程context = await browser.new_context( record_video_dir="videos/", record_video_size={"width": 1920, "height": 1080})# 操作...# 保存视频await context.close()
3. 实时日志
# 监听所有网络请求page.on("request", lambda request: print(f">> {request.method} {request.url}"))page.on("response", lambda response: print(f"<< {response.status} {response.url}"))# 监听控制台输出page.on("console", lambda msg: print(f"PAGE LOG: {msg.text}"))
4. 保存页面状态
# 保存 HTMLhtml = await page.content()with open("page.html", "w") as f: f.write(html)# 保存截图await page.screenshot(path="page.png")# 保存 HAR(HTTP Archive,所有网络请求)await page.route("**", save_request)
🎓 最佳实践总结
✅ 应该做的
-
1. 复用浏览器实例:避免重复启动 -
2. 使用合适的选择器:优先使用 CSS 选择器,文本次之,XPath 最后 -
3. 添加超时:防止无限等待 -
4. 错误处理:每个操作都捕获异常 -
5. 清理资源:操作完成后关闭浏览器 -
6. 使用 LLM 辅助:截图 + LLM 理解页面内容 -
7. 添加等待:等待元素加载完成 -
8. 限制截图大小:避免返回过大的 base64 图片
❌ 不应该做的
-
1. 频繁启动浏览器:性能开销大 -
2. 硬编码 XPath:页面一改就失效 -
3. 无限等待:没有超时设置 -
4. 忽略错误:不处理异常 -
5. 不清理资源:导致内存泄漏 -
6. 截图不压缩:传输大量数据 -
7. 不使用自动等待:等待元素加载 -
8. 忽略浏览器安全:XSS、CSRF
📚 总结
本文深入剖析了 BrowserUseTool 的实现:
核心能力
-
1. 30+ 浏览器操作:导航、交互、滚动、截图、提取、表单、标签页 -
2. Playwright 集成:强大的浏览器自动化库 -
3. 智能等待:自动等待元素加载和可点击 -
4. 多定位方式:CSS 选择器、文本、XPath -
5. 内容提取:文本、链接、图片 -
6. 截图功能:全屏、元素、Base64 编码
架构设计
BrowserUseTool ↓Playwright (Browser、Context、Page) ↓Chromium / Firefox / WebKit
关键技术
-
• 自动等待:无需手动 sleep -
• 上下文隔离:独立 Cookie、Session -
• 智能滚动:自动滚动到元素 -
• 截图 + LLM:AI 理解页面内容 -
• Base64 编码:图片嵌入消息
应用场景
-
1. 自动化测试:代替 Selenium -
2. 数据抓取:动态页面爬虫 -
3. 表单填写:自动提交 -
4. 监控:页面变化检测 -
5. AI Agent:浏览网页、收集信息
上篇文章回顾
本文是 OpenManus 源码解读系列的第四篇,将深入剖析 MCP (Model Context Protocol) 协议的集成机制,理解如何通过 MCP 实现工具能力的无限扩展。。
第 3 篇:《工具系统的设计哲学 – 打造可扩展的 AI 工具生态》
本文是 OpenManus 源码解读系列的第三篇,将深入剖析工具系统的设计哲学,理解 BaseTool、ToolResult、ToolCollection 等核心组件,以及动态参数解析的魔法
本文是 OpenManus 源码解读系列的第二篇,将深入剖析 Agent 架构的每一层设计,理解状态管理、消息循环、ReAct 模式等核心机制。
第 1 篇:《探秘 OpenManus:从源码角度解读开源 AI Agent 框架》
本文是 OpenManus 源码解读系列的第一篇,将从源码架构角度带你深入理解这个强大的 AI Agent 框架。
🎯 下篇文章预告
第 6 篇:《代码执行的安全沙箱》
我们将深入 PythonExecute 工具,学习:
-
• 进程隔离的实现(multiprocessing) -
• 超时控制(asyncio.wait_for) -
• 输出捕获(stdout/stderr) -
• Docker 沙箱详解 -
• 资源限制(CPU、内存) -
• 安全最佳实践
敬请期待!
本文关键词:#OpenManus #浏览器自动化 #Playwright #网页抓取 #截图 #内容提取 #Web自动化 #AIAgent
夜雨聆风
