libeio库源码分析系列(四)
源码分析mettle后门工具学习 所使用的依赖库
官网:http://securitytech.cc
libeio 线程池初始化流程深度分析
📋 线程池架构概述
基于libeio 1.0.2实际源码分析,线程池系统采用ETP(External Thread Pool)架构,通过eio_init函数初始化,实际调用etp_init函数创建线程池管理器。整个初始化过程涉及互斥锁创建、队列初始化、参数配置等关键步骤。
🏗️ 核心数据结构
线程池管理结构 (struct etp_pool)
/** * 源码位置: etp.c line 136-160 * 实际的线程池管理结构 */structetp_pool{ void*userdata; // 用户数据指针etp_reqqreq_queue; // 请求队列etp_reqqres_queue; // 结果队列unsigned intstarted, idle, wanted; // 线程计数器unsigned intmax_poll_time; // 最大轮询时间unsigned intmax_poll_reqs; // 最大轮询请求数unsigned intnreqs; // 请求计数 (reqlock保护)unsigned intnready; // 就绪计数 (reqlock保护)unsigned intnpending; // 挂起计数 (reqlock保护)unsigned intmax_idle; // 最大空闲线程数unsigned intidle_timeout; // 空闲超时时间(秒)void (*want_poll_cb) (void*userdata); // 轮询需求回调void (*done_poll_cb) (void*userdata); // 轮询完成回调xmutex_twrklock; // 工作线程互斥锁xmutex_treslock; // 结果队列互斥锁xmutex_treqlock; // 请求队列互斥锁xcond_treqwait; // 请求等待条件变量etp_workerwrk_first; // 工作线程链表头节点};
工作线程结构 (struct etp_worker)
/** * 源码位置: etp.c line 120-134 * 实际的工作线程结构 */typedefstructetp_worker{ etp_poolpool; // 所属线程池指针structetp_tmpbuftmpbuf; // 临时缓冲区/* locked by pool->wrklock */structetp_worker*prev, *next; // 双向链表指针xthread_ttid; // 线程ID#ifdefETP_WORKER_COMMONETP_WORKER_COMMON// 可选的通用字段#endif} etp_worker;
请求队列结构 (etp_reqq)
/** * 源码位置: etp.c line 110-115 * 多优先级请求队列实现 */typedefstruct{ ETP_REQ*qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */intsize; } etp_reqq;
🔧 线程池初始化完整流程
1. 入口函数 eio_init
/** * 源码位置: eio.c line 1846-1851 * libeio对外暴露的初始化接口 */intecb_coldeio_init (void (*want_poll)(void), void (*done_poll)(void)) { eio_want_poll_cb=want_poll; // 设置轮询需求回调eio_done_poll_cb=done_poll; // 设置轮询完成回调returnetp_init (EIO_POOL, 0, 0, 0); // 调用ETP初始化}
2. 核心初始化函数 etp_init
/** * 源码位置: etp.c line 287-315 * ETP线程池初始化实现 */ETP_API_DECLintecb_coldetp_init (etp_poolpool, void*userdata, void (*want_poll)(void*userdata), void (*done_poll)(void*userdata)) { // 🔒 创建各种同步原语X_MUTEX_CREATE (pool->wrklock); // 工作线程锁X_MUTEX_CREATE (pool->reslock); // 结果队列锁X_MUTEX_CREATE (pool->reqlock); // 请求队列锁X_COND_CREATE (pool->reqwait); // 请求等待条件变量// 📋 初始化请求和结果队列reqq_init (&pool->req_queue); // 初始化请求队列reqq_init (&pool->res_queue); // 初始化结果队列// 🔗 初始化工作线程链表pool->wrk_first.next=pool->wrk_first.prev=&pool->wrk_first; // 📊 初始化计数器和配置参数pool->started=0; // 已启动线程数pool->idle=0; // 空闲线程数pool->nreqs=0; // 总请求数pool->nready=0; // 就绪请求数pool->npending=0; // 挂起请求数pool->wanted=4; // 期望的线程数// ⚙️ 设置线程池参数pool->max_idle=4; // 最大空闲线程数pool->idle_timeout=10; // 空闲超时时间(秒)// 🎯 设置回调函数pool->userdata=userdata; pool->want_poll_cb=want_poll; pool->done_poll_cb=done_poll; return0; }
3. 队列初始化函数 reqq_init
/** * 源码位置: etp.c line 233-242 * 请求队列初始化实现 */staticvoidecb_coldreqq_init (etp_reqq*q) { intpri; for (pri=0; pri<ETP_NUM_PRI; ++pri) q->qs[pri] =q->qe[pri] =0; // 初始化各优先级队列为空q->size=0; // 队列大小清零}
4. 线程创建工作流程
/** * 源码位置: etp.c line 442-460 * 线程启动函数实现 */staticvoidecb_coldetp_start_thread (etp_poolpool) { etp_worker*wrk=calloc (1, sizeof (etp_worker)); // 分配工作线程结构wrk->pool=pool; // 设置所属线程池X_LOCK (pool->wrklock); // 获取工作线程锁if (xthread_create (&wrk->tid, etp_proc, (void*)wrk)) // 创建线程 { // 🔗 将新线程加入双向链表wrk->prev=&pool->wrk_first; wrk->next=pool->wrk_first.next; pool->wrk_first.next->prev=wrk; pool->wrk_first.next=wrk; ++pool->started; // 增加启动计数 } elsefree (wrk); // 创建失败则释放内存X_UNLOCK (pool->wrklock); // 释放工作线程锁}
5. 线程入口函数 etp_proc
/** * 源码位置: etp.c line 334-425 * 工作线程主函数实现 */X_THREAD_PROC (etp_proc) { ETP_REQ*req; // 请求指针structtimespects; // 时间结构体etp_worker*self= (etp_worker*)thr_arg; // 获取工作线程指针etp_poolpool=self->pool; // 获取线程池指针etp_proc_init (); // 线程初始化/* try to distribute timeouts somewhat evenly */ts.tv_nsec= ((intptr_t)self&1023UL) * (1000000000UL / 1024UL); for (;;) // 主循环开始 { ts.tv_sec=0; X_LOCK (pool->reqlock); // 🔒 锁定请求队列for (;;) // 请求获取循环 { req=reqq_shift (&pool->req_queue); // 从队列获取请求if (ecb_expect_true (req)) // ✅ 获取到请求break; if (ts.tv_sec==1) // ⏰ 超时检测,退出线程 { X_UNLOCK (pool->reqlock); X_LOCK (pool->wrklock); --pool->started; X_UNLOCK (pool->wrklock); goto quit; } ++pool->idle; // 增加空闲计数if (pool->idle <= pool->max_idle) // 未超过最大空闲数X_COND_WAIT (pool->reqwait, pool->reqlock); // 无限期等待else { if (!ts.tv_sec) // 首次设置超时ts.tv_sec=time (0) +pool->idle_timeout; if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) ==ETIMEDOUT) ts.tv_sec=1; // 超时标记 } --pool->idle; // 减少空闲计数 } --pool->nready; // 减少就绪计数X_UNLOCK (pool->reqlock); // 🔓 解锁请求队列if (ecb_expect_false (req->type==ETP_TYPE_QUIT)) // 退出请求 goto quit; ETP_EXECUTE (self, req); // 🎯 执行请求X_LOCK (pool->reslock); // 🔒 锁定结果队列++pool->npending; // 增加挂起计数if (!reqq_push (&pool->res_queue, req)) // 推入结果队列ETP_WANT_POLL (pool); // 触发轮询回调etp_worker_clear (self); // 清理工作线程状态X_UNLOCK (pool->reslock); // 🔓 解锁结果队列 }quit: free (req); // 释放请求内存X_LOCK (pool->wrklock); // 🔒 锁定工作线程etp_worker_free (self); // 释放工作线程资源X_UNLOCK (pool->wrklock); // 🔓 解锁工作线程return0; }
🎯 线程创建时机分析
动态线程创建策略
/** * 源码位置: etp.c line 462-487 * 智能线程创建决策函数 */staticvoidetp_maybe_start_thread (etp_poolpool) { // 📊 检查是否已达到期望线程数if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted)) return; // 🧮 计算是否需要新线程if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool))) return; // 🚀 启动新工作线程etp_start_thread (pool); }
线程数量控制
/** * 源码位置: etp.c line 520-535 * 线程池参数设置函数 */voidetp_set_max_parallel (etp_poolpool, unsigned intnthreads) { if (nthreads>ETP_MAX_PARALLEL) nthreads=ETP_MAX_PARALLEL; pool->wanted=nthreads ? nthreads : 1; // 设置期望线程数}voidetp_set_max_idle (etp_poolpool, unsigned intnthreads) { pool->max_idle=nthreads; // 设置最大空闲线程数}
🔒 同步机制详解
线程抽象层 (xthread.h)
/** * 源码位置: xthread.h * 跨平台线程抽象实现 */staticintxthread_create (xthread_t*tid, void*(*proc)(void*), void*arg) { intretval; sigset_tfullsigset, oldsigset; pthread_attr_tattr; pthread_attr_init (&attr); pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); // 分离线程pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN<X_STACKSIZE ? X_STACKSIZE : PTHREAD_STACK_MIN); sigfillset (&fullsigset); // 屏蔽所有信号pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); retval=pthread_create (tid, &attr, proc, arg) ==0; // 创建线程pthread_sigmask (SIG_SETMASK, &oldsigset, 0); // 恢复信号屏蔽pthread_attr_destroy (&attr); returnretval; }
互斥锁和条件变量
/** * 源码位置: xthread.h * POSIX线程同步原语封装 */typedefpthread_mutex_txmutex_t;#defineX_MUTEX_CREATE(mutex) pthread_mutex_init (&(mutex), 0)#defineX_LOCK(mutex) pthread_mutex_lock (&(mutex))#defineX_UNLOCK(mutex) pthread_mutex_unlock (&(mutex))typedefpthread_cond_txcond_t;#defineX_COND_CREATE(cond) pthread_cond_init (&(cond), 0)#defineX_COND_SIGNAL(cond) pthread_cond_signal (&(cond))#defineX_COND_WAIT(cond,mutex) pthread_cond_wait (&(cond), &(mutex))#defineX_COND_TIMEDWAIT(cond,mutex,to) pthread_cond_timedwait (&(cond), &(mutex), &(to))
📊 初始化调用关系图
用户调用 eio_init(want_poll, done_poll) ↓ 设置全局回调函数 ↓ 调用 etp_init(EIO_POOL, 0, 0, 0) ↓ 创建同步原语 (3个mutex + 1个cond) ↓ 初始化队列 (req_queue, res_queue) ↓ 设置初始参数 (started=0, idle=0, wanted=4) ↓ 设置配置参数 (max_idle=4, idle_timeout=10) ↓ 初始化完成,返回0 ↓ 实际线程创建发生在首次提交任务时 ↓ 通过 etp_maybe_start_thread() 动态创建
⚠️ 实际使用注意事项
1. 必须提供回调函数
// ❌ 错误示例(会导致程序卡死)eio_init(NULL, NULL); // 源码中未做NULL检查// ✅ 正确示例voidwant_poll_callback(void) { /* 处理轮询需求 */ }voiddone_poll_callback(void) { /* 处理轮询完成 */ }eio_init(want_poll_callback, done_poll_callback);
2. 线程创建是惰性的
// 线程池初始化时不创建实际线程// 线程会在首次提交任务时按需创建eio_nop(EIO_PRI_DEFAULT, callback, NULL); // 此时才创建线程
3. 默认配置参数
// 源码中的默认值pool->wanted=4; // 期望4个工作线程pool->max_idle=4; // 最多4个空闲线程pool->idle_timeout=10; // 空闲10秒后退出
🔍 调试和监控
状态查询函数
// 源码提供的状态查询接口unsigned inteio_nreqs(void); // 在途请求数unsigned inteio_nready(void); // 就绪请求数unsigned inteio_npending(void); // 挂起请求数unsigned inteio_nthreads(void); // 工作线程数
内部调试信息
// 可通过修改源码添加调试输出printf("线程池状态: started=%u, idle=%u, nreqs=%u\n", pool->started, pool->idle, pool->nreqs);
-
公众号:安全狗的自我修养
-
vx:2207344074
-
http://gitee.com/haidragon
-
http://github.com/haidragon
-
bilibili:haidragonx
-


夜雨聆风

