乐于分享
好东西不私藏

libeio库源码分析系列(四)

libeio库源码分析系列(四)

源码分析mettle后门工具学习 所使用的依赖库

官网:http://securitytech.cc

libeio 线程池初始化流程深度分析

📋 线程池架构概述

基于libeio 1.0.2实际源码分析,线程池系统采用ETP(External Thread Pool)架构,通过eio_init函数初始化,实际调用etp_init函数创建线程池管理器。整个初始化过程涉及互斥锁创建、队列初始化、参数配置等关键步骤。


🏗️ 核心数据结构

线程池管理结构 (struct etp_pool)

/** * 源码位置: etp.c line 136-160 * 实际的线程池管理结构 */structetp_pool{   void*userdata;                    // 用户数据指针etp_reqqreq_queue;                // 请求队列etp_reqqres_queue;                // 结果队列unsigned intstartedidlewanted// 线程计数器unsigned intmax_poll_time;        // 最大轮询时间unsigned intmax_poll_reqs;        // 最大轮询请求数unsigned intnreqs;                // 请求计数 (reqlock保护)unsigned intnready;               // 就绪计数 (reqlock保护)unsigned intnpending;             // 挂起计数 (reqlock保护)unsigned intmax_idle;             // 最大空闲线程数unsigned intidle_timeout;         // 空闲超时时间(秒)void (*want_poll_cb) (void*userdata);  // 轮询需求回调void (*done_poll_cb) (void*userdata);  // 轮询完成回调xmutex_twrklock;                  // 工作线程互斥锁xmutex_treslock;                  // 结果队列互斥锁xmutex_treqlock;                  // 请求队列互斥锁xcond_treqwait;                  // 请求等待条件变量etp_workerwrk_first;              // 工作线程链表头节点};

工作线程结构 (struct etp_worker)

/** * 源码位置: etp.c line 120-134 * 实际的工作线程结构 */typedefstructetp_worker{  etp_poolpool;                     // 所属线程池指针structetp_tmpbuftmpbuf;          // 临时缓冲区/* locked by pool->wrklock */structetp_worker*prev*next;    // 双向链表指针xthread_ttid;                     // 线程ID#ifdefETP_WORKER_COMMONETP_WORKER_COMMON// 可选的通用字段#endifetp_worker;

请求队列结构 (etp_reqq)

/** * 源码位置: etp.c line 110-115 * 多优先级请求队列实现 */typedefstruct{  ETP_REQ*qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */intsize; } etp_reqq;

🔧 线程池初始化完整流程

1. 入口函数 eio_init

/** * 源码位置: eio.c line 1846-1851 * libeio对外暴露的初始化接口 */intecb_coldeio_init (void (*want_poll)(void), void (*done_poll)(void)) {  eio_want_poll_cb=want_poll;      // 设置轮询需求回调eio_done_poll_cb=done_poll;      // 设置轮询完成回调returnetp_init (EIO_POOL000);  // 调用ETP初始化}

2. 核心初始化函数 etp_init

/** * 源码位置: etp.c line 287-315 * ETP线程池初始化实现 */ETP_API_DECLintecb_coldetp_init (etp_poolpoolvoid*userdatavoid (*want_poll)(void*userdata), void (*done_poll)(void*userdata)) {  // 🔒 创建各种同步原语X_MUTEX_CREATE (pool->wrklock);    // 工作线程锁X_MUTEX_CREATE (pool->reslock);    // 结果队列锁X_MUTEX_CREATE (pool->reqlock);    // 请求队列锁X_COND_CREATE  (pool->reqwait);    // 请求等待条件变量// 📋 初始化请求和结果队列reqq_init (&pool->req_queue);      // 初始化请求队列reqq_init (&pool->res_queue);      // 初始化结果队列// 🔗 初始化工作线程链表pool->wrk_first.next=pool->wrk_first.prev=&pool->wrk_first;  // 📊 初始化计数器和配置参数pool->started=0;                // 已启动线程数pool->idle=0;                // 空闲线程数pool->nreqs=0;                // 总请求数pool->nready=0;                // 就绪请求数pool->npending=0;                // 挂起请求数pool->wanted=4;                // 期望的线程数// ⚙️ 设置线程池参数pool->max_idle=4;                // 最大空闲线程数pool->idle_timeout=10;           // 空闲超时时间(秒)// 🎯 设置回调函数pool->userdata=userdata;  pool->want_poll_cb=want_poll;  pool->done_poll_cb=done_poll;  return0; }

3. 队列初始化函数 reqq_init

/** * 源码位置: etp.c line 233-242 * 请求队列初始化实现 */staticvoidecb_coldreqq_init (etp_reqq*q) {  intpri;  for (pri=0pri<ETP_NUM_PRI++pri)    q->qs[pri=q->qe[pri=0;     // 初始化各优先级队列为空q->size=0;                       // 队列大小清零}

4. 线程创建工作流程

/** * 源码位置: etp.c line 442-460 * 线程启动函数实现 */staticvoidecb_coldetp_start_thread (etp_poolpool) {  etp_worker*wrk=calloc (1sizeof (etp_worker));  // 分配工作线程结构wrk->pool=pool;                  // 设置所属线程池X_LOCK (pool->wrklock);            // 获取工作线程锁if (xthread_create (&wrk->tidetp_proc, (void*)wrk))  // 创建线程     {      // 🔗 将新线程加入双向链表wrk->prev=&pool->wrk_first;      wrk->next=pool->wrk_first.next;      pool->wrk_first.next->prev=wrk;      pool->wrk_first.next=wrk;      ++pool->started;               // 增加启动计数     }  elsefree (wrk);                      // 创建失败则释放内存X_UNLOCK (pool->wrklock);          // 释放工作线程锁}

5. 线程入口函数 etp_proc

/** * 源码位置: etp.c line 334-425 * 工作线程主函数实现 */X_THREAD_PROC (etp_proc) {  ETP_REQ*req;                      // 请求指针structtimespects;                // 时间结构体etp_worker*self= (etp_worker*)thr_arg;  // 获取工作线程指针etp_poolpool=self->pool;        // 获取线程池指针etp_proc_init ();                  // 线程初始化/* try to distribute timeouts somewhat evenly */ts.tv_nsec= ((intptr_t)self&1023UL* (1000000000UL / 1024UL);  for (;;)                           // 主循环开始     {      ts.tv_sec=0;      X_LOCK (pool->reqlock);        // 🔒 锁定请求队列for (;;)                       // 请求获取循环         {          req=reqq_shift (&pool->req_queue);  // 从队列获取请求if (ecb_expect_true (req)) // ✅ 获取到请求break;          if (ts.tv_sec==1)        // ⏰ 超时检测,退出线程             {              X_UNLOCK (pool->reqlock);              X_LOCK (pool->wrklock);              --pool->started;              X_UNLOCK (pool->wrklock);               goto quit;             }          ++pool->idle;              // 增加空闲计数if (pool->idle <= pool->max_idle)  // 未超过最大空闲数X_COND_WAIT (pool->reqwaitpool->reqlock);  // 无限期等待else             {              if (!ts.tv_sec)        // 首次设置超时ts.tv_sec=time (0+pool->idle_timeout;              if (X_COND_TIMEDWAIT (pool->reqwaitpool->reqlockts==ETIMEDOUT)                ts.tv_sec=1;       // 超时标记             }          --pool->idle;              // 减少空闲计数         }      --pool->nready;                // 减少就绪计数X_UNLOCK (pool->reqlock);      // 🔓 解锁请求队列if (ecb_expect_false (req->type==ETP_TYPE_QUIT))  // 退出请求         goto quit;      ETP_EXECUTE (selfreq);       // 🎯 执行请求X_LOCK (pool->reslock);        // 🔒 锁定结果队列++pool->npending;              // 增加挂起计数if (!reqq_push (&pool->res_queuereq))  // 推入结果队列ETP_WANT_POLL (pool);        // 触发轮询回调etp_worker_clear (self);       // 清理工作线程状态X_UNLOCK (pool->reslock);      // 🔓 解锁结果队列     }quit:  free (req);                        // 释放请求内存X_LOCK (pool->wrklock);            // 🔒 锁定工作线程etp_worker_free (self);            // 释放工作线程资源X_UNLOCK (pool->wrklock);          // 🔓 解锁工作线程return0; }

🎯 线程创建时机分析

动态线程创建策略

/** * 源码位置: etp.c line 462-487 * 智能线程创建决策函数 */staticvoidetp_maybe_start_thread (etp_poolpool) {  // 📊 检查是否已达到期望线程数if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))    return;  // 🧮 计算是否需要新线程if (ecb_expect_true (0 <= (int)etp_nthreads (pool+ (int)etp_npending (pool- (int)etp_nreqs (pool)))    return;  // 🚀 启动新工作线程etp_start_thread (pool); }

线程数量控制

/** * 源码位置: etp.c line 520-535 * 线程池参数设置函数 */voidetp_set_max_parallel (etp_poolpoolunsigned intnthreads) {  if (nthreads>ETP_MAX_PARALLEL)    nthreads=ETP_MAX_PARALLEL;  pool->wanted=nthreads ? nthreads : 1;  // 设置期望线程数}voidetp_set_max_idle (etp_poolpoolunsigned intnthreads) {  pool->max_idle=nthreads;         // 设置最大空闲线程数}

🔒 同步机制详解

线程抽象层 (xthread.h)

/** * 源码位置: xthread.h * 跨平台线程抽象实现 */staticintxthread_create (xthread_t*tidvoid*(*proc)(void*), void*arg) {  intretval;  sigset_tfullsigsetoldsigset;  pthread_attr_tattr;  pthread_attr_init (&attr);  pthread_attr_setdetachstate (&attrPTHREAD_CREATE_DETACHED);  // 分离线程pthread_attr_setstacksize (&attrPTHREAD_STACK_MIN<X_STACKSIZE ? X_STACKSIZE : PTHREAD_STACK_MIN);  sigfillset (&fullsigset);          // 屏蔽所有信号pthread_sigmask (SIG_SETMASK&fullsigset&oldsigset);  retval=pthread_create (tid&attrprocarg==0;  // 创建线程pthread_sigmask (SIG_SETMASK&oldsigset0);          // 恢复信号屏蔽pthread_attr_destroy (&attr);  returnretval; }

互斥锁和条件变量

/** * 源码位置: xthread.h * POSIX线程同步原语封装 */typedefpthread_mutex_txmutex_t;#defineX_MUTEX_CREATE(mutex)  pthread_mutex_init (&(mutex), 0)#defineX_LOCK(mutex)          pthread_mutex_lock (&(mutex))#defineX_UNLOCK(mutex)        pthread_mutex_unlock (&(mutex))typedefpthread_cond_txcond_t;#defineX_COND_CREATE(cond)    pthread_cond_init (&(cond), 0)#defineX_COND_SIGNAL(cond)    pthread_cond_signal (&(cond))#defineX_COND_WAIT(cond,mutex) pthread_cond_wait (&(cond), &(mutex))#defineX_COND_TIMEDWAIT(cond,mutex,to) pthread_cond_timedwait (&(cond), &(mutex), &(to))

📊 初始化调用关系图

用户调用 eio_init(want_poll, done_poll)            ↓    设置全局回调函数            ↓    调用 etp_init(EIO_POOL, 0, 0, 0)            ↓    创建同步原语 (3个mutex + 1个cond)            ↓    初始化队列 (req_queue, res_queue)            ↓    设置初始参数 (started=0, idle=0, wanted=4)            ↓    设置配置参数 (max_idle=4, idle_timeout=10)            ↓    初始化完成,返回0            ↓    实际线程创建发生在首次提交任务时            ↓    通过 etp_maybe_start_thread() 动态创建

⚠️ 实际使用注意事项

1. 必须提供回调函数

// ❌ 错误示例(会导致程序卡死)eio_init(NULLNULL);  // 源码中未做NULL检查// ✅ 正确示例voidwant_poll_callback(void) { /* 处理轮询需求 */ }voiddone_poll_callback(void) { /* 处理轮询完成 */ }eio_init(want_poll_callbackdone_poll_callback);

2. 线程创建是惰性的

// 线程池初始化时不创建实际线程// 线程会在首次提交任务时按需创建eio_nop(EIO_PRI_DEFAULTcallbackNULL);  // 此时才创建线程

3. 默认配置参数

// 源码中的默认值pool->wanted=4;        // 期望4个工作线程pool->max_idle=4;      // 最多4个空闲线程pool->idle_timeout=10// 空闲10秒后退出

🔍 调试和监控

状态查询函数

// 源码提供的状态查询接口unsigned inteio_nreqs(void);     // 在途请求数unsigned inteio_nready(void);    // 就绪请求数unsigned inteio_npending(void);  // 挂起请求数unsigned inteio_nthreads(void);  // 工作线程数

内部调试信息

// 可通过修改源码添加调试输出printf("线程池状态: started=%u, idle=%u, nreqs=%u\n",       pool->startedpool->idlepool->nreqs);
  • 公众号:安全狗的自我修养

  • vx:2207344074

  • http://gitee.com/haidragon

  • http://github.com/haidragon

  • bilibili:haidragonx

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » libeio库源码分析系列(四)

评论 抢沙发

7 + 9 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮