Workerman源码分析与原理解读
Workerman是纯PHP实现的高性能Socket服务器框架,本文深入分析其核心原理和源码实现。
一、Workerman架构概述
1.1 Workerman特点
【Workerman vs Swoole】
┌─────────────────┬─────────────────┬─────────────────┐
│ 特性 │ Workerman │ Swoole │
├─────────────────┼─────────────────┼─────────────────┤
│ 实现语言 │ 纯PHP │ C/C++扩展 │
│ 安装方式 │ Composer │ pecl/编译 │
│ 协程支持 │ 需要扩展 │ 原生支持 │
│ 学习曲线 │ 平缓 │ 较陡 │
│ 性能 │ 高 │ 更高 │
│ 跨平台 │ 支持Windows │ 主要Linux │
│ 调试 │ 方便 │ 相对复杂 │
└─────────────────┴─────────────────┴─────────────────┘
1.2 进程模型
【Workerman进程模型】
┌─────────────────┐
│ Master进程 │
│ (主进程) │
└────────┬────────┘
│
│ fork
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ 工作进程 │ │ 工作进程 │ │ 工作进程 │
│ │ │ │ │ │
│ Event │ │ Event │ │ Event │
│ Loop │ │ Loop │ │ Loop │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌──────┴──────┐
│ Clients │
│ 客户端 │
└─────────────┘
特点:
- 多进程模型
- 每个Worker独立运行事件循环
- 进程间通过信号通信
- 支持平滑重启
二、Worker核心源码分析
2.1 Worker类结构
<?php
/**
* Workerman Worker类核心源码分析
* 文件位置:Workerman/Worker.php
*/
namespaceWorkerman;
classWorker{
/**
* Worker名称
*/
public $name = 'none';
/**
* Worker进程数
*/
public $count = 1;
/**
* 监听地址
*/
protected $_socketName = '';
/**
* 监听Socket
*/
protected $_mainSocket = null;
/**
* 所有Worker实例
*/
protectedstatic $_workers = [];
/**
* 所有Worker进程的PID
*/
protectedstatic $_pidMap = [];
/**
* 主进程PID
*/
protectedstatic $_masterPid = 0;
/**
* 事件循环实例
*/
protectedstatic $_globalEvent = null;
/**
* 回调函数
*/
public $onConnect = null;
public $onMessage = null;
public $onClose = null;
public $onError = null;
public $onBufferFull = null;
public $onBufferDrain = null;
public $onWorkerStart = null;
public $onWorkerStop = null;
/**
* 构造函数
*/
publicfunction__construct($socket_name = '', array $context_option = []){
// 保存Worker实例
$this->workerId = spl_object_hash($this);
static::$_workers[$this->workerId] = $this;
// 解析监听地址
$this->_socketName = $socket_name;
// 创建上下文
if ($socket_name) {
$this->_context = stream_context_create($context_option);
}
}
/**
* 运行所有Worker
*/
publicstaticfunctionrunAll(){
// 检查运行环境
static::checkSapiEnv();
// 初始化
static::init();
// 解析命令
static::parseCommand();
// 守护进程化
static::daemonize();
// 初始化Worker
static::initWorkers();
// 安装信号处理器
static::installSignal();
// 保存主进程PID
static::saveMasterPid();
// 显示启动界面
static::displayUI();
// Fork工作进程
static::forkWorkers();
// 重置标准输入输出
static::resetStd();
// 监控工作进程
static::monitorWorkers();
}
/**
* Fork工作进程
*/
protectedstaticfunctionforkWorkers(){
foreach (static::$_workers as $worker) {
// 根据count创建多个进程
while (count(static::$_pidMap[$worker->workerId]) < $worker->count) {
static::forkOneWorker($worker);
}
}
}
/**
* Fork单个工作进程
*/
protectedstaticfunctionforkOneWorker(self $worker){
$pid = pcntl_fork();
if ($pid > 0) {
// 主进程
static::$_pidMap[$worker->workerId][$pid] = $pid;
} elseif ($pid === 0) {
// 子进程
// 重置信号处理器
static::resetSignal();
// 设置进程标题
static::setProcessTitle('WorkerMan: worker process ' . $worker->name);
// 运行Worker
$worker->run();
exit(0);
} else {
thrownewException("Fork failed");
}
}
/**
* 运行Worker(子进程中执行)
*/
publicfunctionrun(){
// 更新进程状态
static::$_status = static::STATUS_RUNNING;
// 注册shutdown函数
register_shutdown_function([__CLASS__, 'checkErrors']);
// 创建全局事件循环
if (!static::$_globalEvent) {
$event_loop_class = static::getEventLoopName();
static::$_globalEvent = new $event_loop_class;
}
// 监听端口
$this->listen();
// 触发onWorkerStart回调
if ($this->onWorkerStart) {
call_user_func($this->onWorkerStart, $this);
}
// 运行事件循环
static::$_globalEvent->loop();
}
/**
* 监听端口
*/
publicfunctionlisten(){
if (!$this->_socketName) {
return;
}
// 创建监听Socket
$this->_mainSocket = stream_socket_server(
$this->_socketName,
$errno,
$errmsg,
STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,
$this->_context
);
if (!$this->_mainSocket) {
thrownewException($errmsg);
}
// 设置非阻塞
stream_set_blocking($this->_mainSocket, false);
// 注册读事件(接受新连接)
static::$_globalEvent->add(
$this->_mainSocket,
EventInterface::EV_READ,
[$this, 'acceptConnection']
);
}
/**
* 接受新连接
*/
publicfunctionacceptConnection($socket){
// 接受连接
$new_socket = @stream_socket_accept($socket, 0, $remote_address);
if (!$new_socket) {
return;
}
// 创建TcpConnection对象
$connection = new TcpConnection($new_socket, $remote_address);
// 设置回调
$connection->onMessage = $this->onMessage;
$connection->onClose = $this->onClose;
$connection->onError = $this->onError;
$connection->onBufferFull = $this->onBufferFull;
$connection->onBufferDrain = $this->onBufferDrain;
// 触发onConnect回调
if ($this->onConnect) {
call_user_func($this->onConnect, $connection);
}
}
}
2.2 使用示例
<?php
useWorkerman\Worker;
useWorkerman\Connection\TcpConnection;
require_once__DIR__ . '/vendor/autoload.php';
// 创建TCP服务器
$worker = new Worker('tcp://0.0.0.0:8080');
// 设置进程数
$worker->count = 4;
// 设置进程名称
$worker->name = 'MyTcpServer';
// 连接建立时
$worker->onConnect = function(TcpConnection $connection){
echo"New connection from {$connection->getRemoteIp()}\n";
};
// 收到数据时
$worker->onMessage = function(TcpConnection $connection, $data){
$connection->send("Hello, you sent: $data");
};
// 连接关闭时
$worker->onClose = function(TcpConnection $connection){
echo"Connection closed\n";
};
// 运行
Worker::runAll();
三、事件循环源码分析
3.1 EventLoop接口
<?php
/**
* Workerman事件循环接口
* 文件位置:Workerman/Events/EventInterface.php
*/
namespaceWorkerman\Events;
interfaceEventInterface{
// 事件类型
const EV_READ = 1; // 读事件
const EV_WRITE = 2; // 写事件
const EV_SIGNAL = 4; // 信号事件
const EV_TIMER = 8; // 定时器事件
const EV_TIMER_ONCE = 16; // 一次性定时器
/**
* 添加事件监听
*/
publicfunctionadd($fd, $flag, $func, $args = []);
/**
* 删除事件监听
*/
publicfunctiondel($fd, $flag);
/**
* 删除所有定时器
*/
publicfunctionclearAllTimer();
/**
* 运行事件循环
*/
publicfunctionloop();
/**
* 停止事件循环
*/
publicfunctiondestroy();
/**
* 获取定时器数量
*/
publicfunctiongetTimerCount();
}
3.2 Select事件循环实现
<?php
/**
* Select事件循环实现
* 文件位置:Workerman/Events/Select.php
*
* 这是Workerman的默认事件循环实现
* 使用PHP原生的stream_select函数
*/
namespaceWorkerman\Events;
classSelectimplementsEventInterface{
/**
* 读事件回调
*/
protected $_readFds = [];
/**
* 写事件回调
*/
protected $_writeFds = [];
/**
* 信号处理器
*/
protected $_signalHandlers = [];
/**
* 定时器
*/
protected $_timers = [];
/**
* 定时器ID
*/
protected $_timerId = 0;
/**
* 选择超时时间
*/
protected $_selectTimeout = 100000000; // 100ms
/**
* 添加事件
*/
publicfunctionadd($fd, $flag, $func, $args = []){
switch ($flag) {
caseself::EV_READ:
$fd_key = (int)$fd;
$this->_readFds[$fd_key] = $fd;
$this->_readCallbacks[$fd_key] = [$func, $args];
break;
caseself::EV_WRITE:
$fd_key = (int)$fd;
$this->_writeFds[$fd_key] = $fd;
$this->_writeCallbacks[$fd_key] = [$func, $args];
break;
caseself::EV_SIGNAL:
$this->_signalHandlers[$fd] = $func;
pcntl_signal($fd, [$this, 'signalHandler']);
break;
caseself::EV_TIMER:
caseself::EV_TIMER_ONCE:
$timer_id = ++$this->_timerId;
$run_time = microtime(true) + $fd;
$this->_timers[$timer_id] = [
'run_time' => $run_time,
'interval' => $flag === self::EV_TIMER ? $fd : 0,
'func' => $func,
'args' => $args,
];
return $timer_id;
}
returntrue;
}
/**
* 删除事件
*/
publicfunctiondel($fd, $flag){
switch ($flag) {
caseself::EV_READ:
$fd_key = (int)$fd;
unset($this->_readFds[$fd_key], $this->_readCallbacks[$fd_key]);
break;
caseself::EV_WRITE:
$fd_key = (int)$fd;
unset($this->_writeFds[$fd_key], $this->_writeCallbacks[$fd_key]);
break;
caseself::EV_SIGNAL:
pcntl_signal($fd, SIG_IGN);
unset($this->_signalHandlers[$fd]);
break;
caseself::EV_TIMER:
caseself::EV_TIMER_ONCE:
unset($this->_timers[$fd]);
break;
}
returntrue;
}
/**
* 事件循环
*/
publicfunctionloop(){
while (true) {
// 处理信号
pcntl_signal_dispatch();
// 计算select超时时间
$timeout = $this->calculateTimeout();
// 准备文件描述符
$read = $this->_readFds;
$write = $this->_writeFds;
$except = [];
// 调用stream_select
if ($read || $write) {
$ret = @stream_select($read, $write, $except, 0, $timeout);
} else {
usleep($timeout);
$ret = 0;
}
// 处理读事件
if ($read) {
foreach ($read as $fd) {
$fd_key = (int)$fd;
if (isset($this->_readCallbacks[$fd_key])) {
call_user_func_array(
$this->_readCallbacks[$fd_key][0],
array_merge([$fd], $this->_readCallbacks[$fd_key][1])
);
}
}
}
// 处理写事件
if ($write) {
foreach ($write as $fd) {
$fd_key = (int)$fd;
if (isset($this->_writeCallbacks[$fd_key])) {
call_user_func_array(
$this->_writeCallbacks[$fd_key][0],
array_merge([$fd], $this->_writeCallbacks[$fd_key][1])
);
}
}
}
// 处理定时器
$this->processTimers();
}
}
/**
* 处理定时器
*/
protectedfunctionprocessTimers(){
$now = microtime(true);
foreach ($this->_timers as $timer_id => $timer) {
if ($timer['run_time'] <= $now) {
// 执行回调
call_user_func_array($timer['func'], $timer['args']);
// 周期定时器重新计算执行时间
if ($timer['interval'] > 0) {
$this->_timers[$timer_id]['run_time'] = $now + $timer['interval'];
} else {
// 一次性定时器删除
unset($this->_timers[$timer_id]);
}
}
}
}
/**
* 计算select超时时间
*/
protectedfunctioncalculateTimeout(){
if (empty($this->_timers)) {
return$this->_selectTimeout;
}
$min_time = PHP_INT_MAX;
$now = microtime(true);
foreach ($this->_timers as $timer) {
$diff = ($timer['run_time'] - $now) * 1000000;
if ($diff < $min_time) {
$min_time = $diff;
}
}
return max(0, min($min_time, $this->_selectTimeout));
}
}
四、TcpConnection连接管理
4.1 TcpConnection类
<?php
/**
* TCP连接类源码分析
* 文件位置:Workerman/Connection/TcpConnection.php
*/
namespaceWorkerman\Connection;
classTcpConnectionextendsConnectionInterface{
/**
* 连接状态
*/
const STATUS_INITIAL = 0;
const STATUS_CONNECTING = 1;
const STATUS_ESTABLISHED = 2;
const STATUS_CLOSING = 4;
const STATUS_CLOSED = 8;
/**
* Socket资源
*/
protected $_socket = null;
/**
* 发送缓冲区
*/
protected $_sendBuffer = '';
/**
* 接收缓冲区
*/
protected $_recvBuffer = '';
/**
* 当前连接状态
*/
protected $_status = self::STATUS_ESTABLISHED;
/**
* 远程地址
*/
protected $_remoteAddress = '';
/**
* 回调函数
*/
public $onMessage = null;
public $onClose = null;
public $onError = null;
public $onBufferFull = null;
public $onBufferDrain = null;
/**
* 应用层协议
*/
public $protocol = null;
/**
* 构造函数
*/
publicfunction__construct($socket, $remote_address = ''){
$this->_socket = $socket;
$this->_remoteAddress = $remote_address;
// 设置非阻塞
stream_set_blocking($this->_socket, false);
// 设置读写缓冲区
stream_set_read_buffer($this->_socket, 0);
// 注册读事件
Worker::$globalEvent->add(
$this->_socket,
EventInterface::EV_READ,
[$this, 'baseRead']
);
}
/**
* 读取数据
*/
publicfunctionbaseRead($socket){
// 读取数据
$buffer = @fread($socket, 65535);
// 连接关闭
if ($buffer === '' || $buffer === false) {
if (feof($socket) || !is_resource($socket)) {
$this->destroy();
return;
}
}
// 添加到接收缓冲区
$this->_recvBuffer .= $buffer;
// 如果有协议,使用协议解析
if ($this->protocol) {
while ($this->_recvBuffer !== '') {
// 检查数据包是否完整
$length = $this->protocol::input($this->_recvBuffer, $this);
if ($length === 0) {
// 数据不完整,等待更多数据
break;
}
// 提取完整数据包
$package = substr($this->_recvBuffer, 0, $length);
$this->_recvBuffer = substr($this->_recvBuffer, $length);
// 解码数据
$data = $this->protocol::decode($package, $this);
// 触发onMessage回调
if ($this->onMessage) {
call_user_func($this->onMessage, $this, $data);
}
}
} else {
// 无协议,直接触发回调
if ($this->onMessage) {
call_user_func($this->onMessage, $this, $this->_recvBuffer);
}
$this->_recvBuffer = '';
}
}
/**
* 发送数据
*/
publicfunctionsend($data, $raw = false){
if ($this->_status === self::STATUS_CLOSED) {
returnfalse;
}
// 协议编码
if (!$raw && $this->protocol) {
$data = $this->protocol::encode($data, $this);
}
// 如果发送缓冲区为空,直接发送
if ($this->_sendBuffer === '') {
$len = @fwrite($this->_socket, $data);
// 全部发送成功
if ($len === strlen($data)) {
returntrue;
}
// 部分发送成功
if ($len > 0) {
$this->_sendBuffer = substr($data, $len);
} else {
$this->_sendBuffer = $data;
}
// 注册写事件
Worker::$globalEvent->add(
$this->_socket,
EventInterface::EV_WRITE,
[$this, 'baseWrite']
);
returnnull;
}
// 添加到发送缓冲区
$this->_sendBuffer .= $data;
// 检查缓冲区是否已满
if (strlen($this->_sendBuffer) >= $this->maxSendBufferSize) {
if ($this->onBufferFull) {
call_user_func($this->onBufferFull, $this);
}
}
returnnull;
}
/**
* 写入数据
*/
publicfunctionbaseWrite(){
$len = @fwrite($this->_socket, $this->_sendBuffer);
if ($len === strlen($this->_sendBuffer)) {
// 全部发送完成
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
$this->_sendBuffer = '';
// 触发缓冲区清空回调
if ($this->onBufferDrain) {
call_user_func($this->onBufferDrain, $this);
}
// 如果正在关闭,销毁连接
if ($this->_status === self::STATUS_CLOSING) {
$this->destroy();
}
} elseif ($len > 0) {
// 部分发送
$this->_sendBuffer = substr($this->_sendBuffer, $len);
}
}
/**
* 关闭连接
*/
publicfunctionclose($data = null){
if ($this->_status === self::STATUS_CLOSED) {
return;
}
// 发送剩余数据
if ($data !== null) {
$this->send($data);
}
// 如果还有数据未发送,标记为正在关闭
if ($this->_sendBuffer !== '') {
$this->_status = self::STATUS_CLOSING;
return;
}
$this->destroy();
}
/**
* 销毁连接
*/
publicfunctiondestroy(){
if ($this->_status === self::STATUS_CLOSED) {
return;
}
// 移除事件监听
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
// 关闭Socket
@fclose($this->_socket);
$this->_status = self::STATUS_CLOSED;
// 触发onClose回调
if ($this->onClose) {
call_user_func($this->onClose, $this);
}
}
/**
* 获取远程IP
*/
publicfunctiongetRemoteIp(){
$pos = strrpos($this->_remoteAddress, ':');
if ($pos) {
return substr($this->_remoteAddress, 0, $pos);
}
return'';
}
/**
* 获取远程端口
*/
publicfunctiongetRemotePort(){
$pos = strrpos($this->_remoteAddress, ':');
if ($pos) {
return (int)substr($this->_remoteAddress, $pos + 1);
}
return0;
}
}
五、协议解析
5.1 协议接口
<?php
/**
* Workerman协议接口
* 文件位置:Workerman/Protocols/ProtocolInterface.php
*/
namespaceWorkerman\Protocols;
interfaceProtocolInterface{
/**
* 检查数据包完整性
*
* @param string $buffer 接收缓冲区
* @param TcpConnection $connection 连接对象
* @return int 返回数据包长度,0表示数据不完整
*/
publicstaticfunctioninput($buffer, $connection);
/**
* 解码数据包
*
* @param string $buffer 完整的数据包
* @param TcpConnection $connection 连接对象
* @return mixed 解码后的数据
*/
publicstaticfunctiondecode($buffer, $connection);
/**
* 编码数据包
*
* @param mixed $data 要发送的数据
* @param TcpConnection $connection 连接对象
* @return string 编码后的数据
*/
publicstaticfunctionencode($data, $connection);
}
5.2 自定义协议示例
<?php
/**
* 自定义协议示例
* 格式:4字节长度 + 数据内容
*/
namespaceProtocols;
classMyProtocol{
/**
* 包头长度
*/
const HEADER_LENGTH = 4;
/**
* 检查数据包完整性
*/
publicstaticfunctioninput($buffer, $connection){
// 数据不足包头长度
if (strlen($buffer) < self::HEADER_LENGTH) {
return0;
}
// 解析包头获取数据长度
$data = unpack('Nlength', $buffer);
$length = $data['length'];
// 检查数据是否完整
if (strlen($buffer) < self::HEADER_LENGTH + $length) {
return0;
}
// 返回完整数据包长度
returnself::HEADER_LENGTH + $length;
}
/**
* 解码
*/
publicstaticfunctiondecode($buffer, $connection){
// 去掉包头,返回数据内容
$data = substr($buffer, self::HEADER_LENGTH);
// 如果是JSON,解析
$decoded = json_decode($data, true);
return $decoded !== null ? $decoded : $data;
}
/**
* 编码
*/
publicstaticfunctionencode($data, $connection){
// 如果是数组,转JSON
if (is_array($data)) {
$data = json_encode($data, JSON_UNESCAPED_UNICODE);
}
// 添加包头
return pack('N', strlen($data)) . $data;
}
}
// 使用自定义协议
$worker = new Worker('MyProtocol://0.0.0.0:8080');
$worker->onMessage = function($connection, $data){
// $data 已经是解码后的数据
$connection->send(['code' => 0, 'msg' => 'success']);
};
5.3 HTTP协议实现
<?php
/**
* HTTP协议简化实现
*/
namespaceWorkerman\Protocols;
classHttp{
/**
* 检查HTTP请求完整性
*/
publicstaticfunctioninput($buffer, $connection){
// 查找\r\n\r\n(头部结束标记)
$pos = strpos($buffer, "\r\n\r\n");
if ($pos === false) {
// 头部不完整
if (strlen($buffer) > 16384) {
$connection->close("HTTP/1.1 413 Request Entity Too Large\r\n\r\n");
return0;
}
return0;
}
// 解析Content-Length
$header = substr($buffer, 0, $pos);
if (preg_match('/Content-Length:\s*(\d+)/i', $header, $matches)) {
$content_length = (int)$matches[1];
return $pos + 4 + $content_length;
}
return $pos + 4;
}
/**
* 解码HTTP请求
*/
publicstaticfunctiondecode($buffer, $connection){
// 分离头部和body
$pos = strpos($buffer, "\r\n\r\n");
$header_str = substr($buffer, 0, $pos);
$body = substr($buffer, $pos + 4);
// 解析请求行
$header_lines = explode("\r\n", $header_str);
$first_line = array_shift($header_lines);
list($method, $uri, $protocol) = explode(' ', $first_line);
// 解析头部
$headers = [];
foreach ($header_lines as $line) {
$parts = explode(':', $line, 2);
if (count($parts) === 2) {
$headers[trim($parts[0])] = trim($parts[1]);
}
}
// 解析URI
$url_info = parse_url($uri);
$path = $url_info['path'] ?? '/';
// 解析GET参数
$get = [];
if (isset($url_info['query'])) {
parse_str($url_info['query'], $get);
}
// 解析POST参数
$post = [];
if ($method === 'POST' && $body) {
$content_type = $headers['Content-Type'] ?? '';
if (strpos($content_type, 'application/json') !== false) {
$post = json_decode($body, true) ?? [];
} else {
parse_str($body, $post);
}
}
// 返回请求对象
returnnew Request($method, $path, $headers, $get, $post, $body);
}
/**
* 编码HTTP响应
*/
publicstaticfunctionencode($data, $connection){
if ($data instanceof Response) {
return (string)$data;
}
// 简单响应
$body = is_array($data) ? json_encode($data) : $data;
$content_type = is_array($data) ? 'application/json' : 'text/html';
return"HTTP/1.1 200 OK\r\n" .
"Content-Type: {$content_type}; charset=utf-8\r\n" .
"Content-Length: " . strlen($body) . "\r\n" .
"Connection: keep-alive\r\n" .
"\r\n" .
$body;
}
}
六、总结
6.1 Workerman核心特点
【Workerman核心特点】
1、纯PHP实现
- 无需安装扩展
- 跨平台支持
- 调试方便
2、多进程模型
- Master-Worker架构
- 进程间隔离
- 平滑重启
3、事件驱动
- 支持多种事件库
- 非阻塞IO
- 高并发处理
4、协议灵活
- 内置HTTP/WebSocket
- 自定义协议简单
- 二进制协议支持
6.2 适用场景
【Workerman适用场景】
✅ 适合:
- 即时通讯(IM)
- 游戏服务器
- 物联网网关
- 推送服务
- API服务
❌ 不适合:
- 需要协程的场景
- 极致性能要求
- CPU密集型任务
📌 下期预告:《Guzzle HTTP客户端源码》
夜雨聆风