mpiUtils
#include <mpi.h>#define COMM_WORLD MpiComm(MPI_COMM_WORLD)
#define MPICHECK(cmd) \
do \
{ \
int e = cmd; \
if (e != MPI_SUCCESS) \
{ \
printf("Failed: MPI error %s:%d '%d'\n", __FILE__, __LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while (0)
// A wrapper of MPI data type. MPI_TYPE_{data_type}
enum MpiType
{
MPI_TYPE_BYTE,
MPI_TYPE_CHAR,
MPI_TYPE_INT,
MPI_TYPE_INT64_T,
MPI_TYPE_UINT32_T,
MPI_TYPE_UNSIGNED_LONG_LONG,
};// A wrapper of MPI_Op type.
enum MpiOp
{
MPI_OP_NULLOP,
MPI_OP_MAX,
MPI_OP_MIN,
MPI_OP_SUM,
MPI_OP_PROD,
MPI_OP_LAND,
MPI_OP_BAND,
MPI_OP_LOR,
MPI_OP_BOR,
MPI_OP_LXOR,
MPI_OP_BXOR,
MPI_OP_MINLOC,
MPI_OP_MAXLOC,
MPI_OP_REPLACE,
};MPI 预定义的操作符(Operations)用于规约(Reduction)类通信操作(如 MPI_Reduce、MPI_Allreduce),它们定义了在通信过程中如何合并来自不同进程的数据。下表详细解释了这些操作符的功能。
MPI_OP_NULL | ||
MPI_MAX | ||
MPI_MIN | ||
MPI_SUM | ||
MPI_PROD | ||
MPI_LAND | ||
MPI_BAND | ||
MPI_LOR | ||
MPI_BOR | ||
MPI_LXOR | ||
MPI_BXOR | ||
MPI_MINLOC | ||
MPI_MAXLOC | ||
MPI_REPLACE |
💻 核心概念与典型应用场景
理解这些操作符的关键点在于它们都是针对每个数据元素独立进行的。如果你规约一个包含多个元素的数组,那么每个位置上的元素会独立地与其它进程中相同位置的元素进行指定的操作。
• 数学运算: MPI_SUM、MPI_PROD、MPI_MAX和MPI_MIN是最常用的操作符,广泛用于计算全局统计量,如所有进程数据的加和、平均值、极值等。• 逻辑与位运算: MPI_LAND、MPI_LOR等用于需要判断多个条件是否同时满足的场景;MPI_BAND、MPI_BOR等则用于直接对数据的二进制位进行操作。• 特殊操作符: • MPI_MINLOC和MPI_MAXLOC在需要定位极值来源时非常有用。例如,在并行寻找一个分布式数组中的全局最大值并知道它位于哪个进程时。• MPI_REPLACE的操作比较特殊,它实际上不是“规约”,而是用某个进程的数据(通常由具体的通信算法决定)去替换其他进程的数据。
📚 如何使用
这些操作符主要作为参数用在 MPI_Reduce 和 MPI_Allreduce 等函数中。
• MPI_Reduce:将来自通信域内所有进程的数据通过指定操作符(如MPI_SUM)合并后,将结果只传递给一个指定的根进程(root process)。// 示例:将所有进程的局部求和 local_sum 汇总到根进程0,得到全局总和 global_sum
MPI_Reduce(&local_sum, &global_sum, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);• MPI_Allreduce:与MPI_Reduce类似,但规约后的结果会分发给通信域内的所有进程。// 示例:所有进程都得到全局总和 global_sum
MPI_Allreduce(&local_sum, &global_sum, 1, MPI_FLOAT, MPI_SUM, MPI_COMM_WORLD);
// A wrapper of the level of MPI thread support
enum MpiThreadSupport
{
THREAD_SINGLE,
THREAD_FUNNELED,
THREAD_SERIALIZED,
THREAD_MULTIPLE
};
这段代码定义了一个枚举类型 MpiThreadSupport,它封装了 MPI(Message Passing Interface)库所支持的四种线程支持级别。下面这个表格能帮你快速看懂它们的核心区别:
THREAD_SINGLE | MPI_THREAD_SINGLE | MPI_Init初始化时的默认级别。 |
THREAD_FUNNELED | MPI_THREAD_FUNNELED | |
THREAD_SERIALIZED | MPI_THREAD_SERIALIZED | |
THREAD_MULTIPLE | MPI_THREAD_MULTIPLE |
如何选择与使用
在实际编程中,你需要通过 MPI_Init_thread 函数(而非普通的 MPI_Init)来初始化MPI环境,并请求你需要的线程支持级别。MPI库会返回它实际能提供的级别,这可能等于或低于你请求的级别。
一个典型的使用示例如下:
#include <mpi.h>
int main(int argc, char** argv){
int provided;
// 请求 MPI_THREAD_MULTIPLE 级别
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
// 检查实际获得的级别
if (provided < MPI_THREAD_MULTIPLE) {
// 库可能只支持较低级别,需要调整程序行为
}
// ... 你的并行代码 ...
MPI_Finalize();
return 0;
}重要注意事项
• 级别是递增的:这四种级别在支持能力上是递增的,即 MPI_THREAD_SINGLE<MPI_THREAD_FUNNELED<MPI_THREAD_SERIALIZED<MPI_THREAD_MULTIPLE。MPI实现可能无法提供你所请求的最高级别。• 线程安全责任:即使你获得了 THREAD_MULTIPLE支持,也不意味着所有的MPI调用都是自动线程安全的。当多个线程操作相同的MPI对象(如同一个通信器)时,应用程序自身必须负责通过互斥锁等机制来协调访问,避免冲突。例如,确保集体操作(如MPI_Bcast)在同一通信器上不被并发调用。• 性能考量:更高的线程支持级别(尤其是 THREAD_MULTIPLE)通常会带来额外的同步开销。如果程序不需要多个线程同时进行通信,请求较低的级别可能有助于获得更好的性能。
总结
这个 MpiThreadSupport 枚举为你提供了一种清晰的方式来指定和判断你的MPI程序希望以何种线程模式运行。理解这些级别的差异,并根据应用程序的并行设计(例如,是主从线程模型还是对等的多工作线程模型)来正确选择和请求相应的级别,是编写正确、高效混合式MPI多线程程序的关键第一步。
struct MpiComm
{
MPI_Comm group;
MpiComm(){};
MpiComm(MPI_Comm g)
: group(g){};
};MPI_Datatype getMpiDtype(MpiType dtype)
{
static const std::unordered_map<MpiType, MPI_Datatype> dtype_map{
{MPI_TYPE_BYTE, MPI_BYTE},
{MPI_TYPE_CHAR, MPI_CHAR},
{MPI_TYPE_INT, MPI_INT},
{MPI_TYPE_INT64_T, MPI_INT64_T},
{MPI_TYPE_UINT32_T, MPI_UINT32_T},
{MPI_TYPE_UNSIGNED_LONG_LONG, MPI_UNSIGNED_LONG_LONG},
};
return dtype_map.at(dtype);
}
MPI_Op getMpiOp(MpiOp op)
{
static const std::unordered_map<MpiOp, MPI_Op> op_map{
{MPI_OP_NULLOP, MPI_OP_NULL},
{MPI_OP_MAX, MPI_MAX},
{MPI_OP_MIN, MPI_MIN},
{MPI_OP_SUM, MPI_SUM},
{MPI_OP_PROD, MPI_PROD},
{MPI_OP_LAND, MPI_LAND},
{MPI_OP_BAND, MPI_BAND},
{MPI_OP_LOR, MPI_LOR},
{MPI_OP_BOR, MPI_BOR},
{MPI_OP_LXOR, MPI_LXOR},
{MPI_OP_BXOR, MPI_BXOR},
{MPI_OP_MINLOC, MPI_MINLOC},
{MPI_OP_MAXLOC, MPI_MAXLOC},
{MPI_OP_REPLACE, MPI_REPLACE},
};
return op_map.at(op);
}void initialize(int* argc, char*** argv)
{
MPICHECK(MPI_Init(argc, argv));
}
void finalize()
{
MPICHECK(MPI_Finalize());
}
bool isInitialized()
{
int mpi_initialized = 0;
MPICHECK(MPI_Initialized(&mpi_initialized));
return static_cast<bool>(mpi_initialized);
}
void initThread(int* argc, char*** argv, MpiThreadSupport required, int* provided)
{
switch (required)
{
case THREAD_SINGLE: MPICHECK(MPI_Init_thread(argc, argv, MPI_THREAD_SINGLE, provided)); break;
case THREAD_FUNNELED: MPICHECK(MPI_Init_thread(argc, argv, MPI_THREAD_FUNNELED, provided)); break;
case THREAD_SERIALIZED: MPICHECK(MPI_Init_thread(argc, argv, MPI_THREAD_SERIALIZED, provided)); break;
case THREAD_MULTIPLE: MPICHECK(MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, provided)); break;
default: break;
}
}
int getCommWorldRank()
{
int rank = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
int getCommWorldSize()
{
int world_size = 1;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
return world_size;
}void barrier(MpiComm comm)
{
MPICHECK(MPI_Barrier(comm.group));
}
void barrier()
{
MPICHECK(MPI_Barrier(MPI_COMM_WORLD));
}void bcast(void* buffer, size_t size, MpiType dtype, int root, MpiComm comm)
{
MPICHECK(MPI_Bcast(buffer, size, getMpiDtype(dtype), root, comm.group));
}
void bcast(std::vector<int64_t>& packed, int root, MpiComm comm)
{
int64_t nWords1;
if (getCommWorldRank() == root)
{
nWords1 = static_cast<int64_t>(packed.size());
}
bcast(&nWords1, 1, MPI_TYPE_INT64_T, root, comm);
if (getCommWorldRank() != root)
{
packed.resize(nWords1);
}
bcast(packed.data(), packed.size(), MPI_TYPE_INT64_T, root, comm);
}这段代码展示了针对MPI广播操作的两个封装函数,它们增强了类型安全性和对动态数据结构的支持。下面我们来详细解释其工作原理。
基础广播函数
第一个函数提供了MPI_Bcast的一个基本包装:
void bcast(void* buffer, size_t size, MpiType dtype, int root, MpiComm comm)
{
MPICHECK(MPI_Bcast(buffer, size, getMpiDtype(dtype), root, comm.group));
}• 功能:此函数用于将数据从根进程广播到通信域内的所有其他进程。 • 参数解析: • void* buffer:指向数据缓冲区的指针。对根进程是发送缓冲区,对其他进程是接收缓冲区。• size_t size:要广播的数据元素数量。• MpiType dtype:数据类型(如MPI_TYPE_INT64_T对应int64_t),通过getMpiDtype转换为MPI标准数据类型。• int root:指定广播数据的根进程的秩(rank)。• MpiComm comm:通信域(如MPI_COMM_WORLD),定义了参与广播的进程组。• 关键点: MPICHECK宏可能用于检查MPI_Bcast的返回值(通常是MPI_SUCCESS),并进行错误处理。这是一个阻塞式集体通信操作,意味着通信域内所有进程都必须调用此函数,调用后所有进程的缓冲区数据都与根进程一致。
动态向量的广播函数
第二个函数专门用于广播std::vector<int64_t>,解决了动态容器大小不确定的问题:
void bcast(std::vector<int64_t>& packed, int root, MpiComm comm)
{
int64_t nWords1;
if (getCommWorldRank() == root) {
nWords1 = static_cast<int64_t>(packed.size()); // 根进程获取本地向量大小
}
// 首先广播向量的大小
bcast(&nWords1, 1, MPI_TYPE_INT64_T, root, comm);
if (getCommWorldRank() != root) {
packed.resize(nWords1); // 非根进程根据接收到的大小调整向量
}
// 然后广播向量的实际内容
bcast(packed.data(), packed.size(), MPI_TYPE_INT64_T, root, comm);
}这个函数的关键在于其两步广播机制:
1. 广播向量尺寸:根进程先将其本地向量 packed的大小存入变量nWords1,然后通过第一次bcast调用将这个单一int64_t值广播给所有进程。这样,通信域内的每个进程都知道了需要接收或准备的数据量。2. 调整缓冲区并广播数据:非根进程根据接收到的向量大小 nWords1,调用resize调整自己的packed向量容量,确保有足够空间接收数据。最后,所有进程(包括根进程)再次调用bcast,基于已知的尺寸广播或接收向量的实际内容。
注意事项与优势
• 同步性: MPI_Bcast是集体通信操作,通信域内所有进程(包括根进程)都必须以相同的参数顺序执行该函数,否则可能导致死锁或错误。• 错误处理: MPICHECK宏封装了MPI函数的错误检查,确保广播操作成功执行。• 设计优势:这种封装将底层MPI调用细节隐藏起来,提供了更高级、更安全的接口。特别是对 std::vector的封装,优雅地解决了动态数据结构在广播时大小未知的问题,避免了手动管理内存的麻烦。
总结
总而言之,这段代码通过函数重载提供了两种层次的广播功能:基础版本直接操作内存缓冲区,而针对std::vector<int64_t>的版本通过两步广播(先大小后数据)安全地处理动态容器。这种设计体现了良好的抽象,既保留了MPI的强大功能,又提升了代码的易用性和安全性。
void comm_split(MpiComm comm, int color, int key, MpiComm* newcomm)
{
MPICHECK(MPI_Comm_split(comm.group, color, key, &newcomm->group));
}
这是一个对 MPI 标准库函数 MPI_Comm_split 的封装函数,用于将一个通信器(communicator)中的进程按照特定规则划分为多个不相交的子通信器。这个操作在复杂的并行编程中非常有用,可以实现进程的分组管理 。
函数参数详解
comm | MpiComm | MPI_COMM_WORLD),是需要被分割的进程组 。 |
color | int | 分组标识color 值相同的进程将被划分到同一个新通信器中 。如果设置为 MPI_UNDEFINED,则该进程不属于任何新创建的子通信器,函数返回后 newcomm 将为 MPI_COMM_NULL 。 |
key | int | 排序键值key 值从小到大进行排序;如果 key 值相同,则它们在原始通信器中的相对排名将决定其在新通信器中的顺序 。 |
newcomm | MpiComm* | 输出参数 |
核心机制与工作流程
1. 按颜色分组:函数首先根据每个进程调用时传入的 color值进行分组。所有传入相同color值的进程会被归入同一个新的子通信器 。这样,原始通信器就被分割成了若干个彼此独立的子集 。2. 按键值排序:在每个由相同 color定义的子组内部,进程会按照key参数进行排序,以确定它们在新通信器中的排名(rank)。key值越小的进程,在新通信器中的排名越靠前。如果多个进程指定了相同的key值,那么它们在原始通信器(comm)中的原始排名将作为打破平局的依据 。3. 创建新通信器:完成分组和排序后,函数会为每个唯一的 color值创建一个新的通信器对象,并通过newcomm参数返回给各个进程 。
关键特性与注意事项
• 集体通信操作: MPI_Comm_split是一个集体通信(collective) 函数。这意味着原始通信器comm中的所有进程都必须调用这个函数,即使某些进程通过设置color=MPI_UNDEFINED选择不加入任何新通信器 。• 资源管理:通信器是宝贵的系统资源。当不再需要通过此函数创建的新通信器时,应使用 MPI_Comm_free将其释放,以防止资源泄漏 。• 灵活性:通过巧妙地设置 color和key参数,可以实现非常灵活的进程分组和排名策略。例如,可以将一个二维进程网格按行或按列划分成多个子通信器 。• 错误检查:封装函数中的 MPICHECK宏 likely 用于检查 MPI 函数调用的返回值(如是否为MPI_SUCCESS),并进行统一的错误处理,这增强了代码的健壮性。
应用场景示例
假设有一个并行程序在 MPI_COMM_WORLD 中有 16 个进程,它们逻辑上排列成一个 4x4 的网格。现在需要按行进行通信,例如每行进程独立完成某种计算。可以使用 MPI_Comm_split 为每一行创建一个新的通信器 :
int world_rank, world_size;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
// 确定颜色:行号(假设4x4网格,每行4个进程)
int color = world_rank / 4;
// 使用原始排名作为键值,以保持新通信器中的相对顺序
MpiComm row_comm;
comm_split(MPI_COMM_WORLD, color, world_rank, &row_comm);
// 现在,每个进程可以在 row_comm 中与同一行的其他进程通信与其他函数的比较
MPI_Comm_split 是创建新通信器最常用和灵活的方法之一,特别适用于进程间没有完整的组信息,但每个进程都知道自己期望加入的组(通过 color)的情况 。另一种创建通信器的方法是 MPI_Comm_create,它要求所有进程都拥有完整的组成员信息,通常与 MPI_Group 系列函数配合使用 。
总而言之,你提供的这个 comm_split 函数封装了 MPI 强大的进程分组功能,是构建层次化或分组式并行算法的基础工具。
void allreduce(const void* sendbuf, void* recvbuf, int count, MpiType dtype, MpiOp op, MpiComm comm)
{
MPICHECK(MPI_Allreduce(sendbuf, recvbuf, count, getMpiDtype(dtype), getMpiOp(op), comm.group));
}
void allgather(const void* sendbuf, void* recvbuf, int count, MpiType dtype, MpiComm comm)
{
MPICHECK(MPI_Allgather(sendbuf, count, getMpiDtype(dtype), recvbuf, count, getMpiDtype(dtype), comm.group));
}这两段代码是对 MPI(Message Passing Interface)中两个关键集体通信(Collective Communication)操作的封装,用于并行计算中进程间的数据交换与同步。下面我们来详细解析它们。
下表快速对比一下这两个函数的核心特性:
allreduce | allgather | |
|---|---|---|
| 核心功能 | ||
| 通信模式 | ||
| 关键参数 | op | |
| 结果数据 | recvbuf 中都是相同的规约结果。 | recvbuf 中是按进程号顺序拼接的来自所有进程的数据。 |
🔄 全局规约函数 allreduce
这个函数封装了 MPI_Allreduce 操作。
void allreduce(const void* sendbuf, void* recvbuf, int count, MpiType dtype, MpiOp op, MpiComm comm)• 功能:该函数执行一个全局规约操作。它会将通信域 comm内所有进程发送缓冲区(sendbuf)中的数据,通过指定的操作(op,如求和MPI_SUM、求最大值MPI_MAX等)进行合并计算,并将最终结果分发到每一个进程的接收缓冲区(recvbuf)中。这意味着调用完成后,所有进程的recvbuf中都持有完全相同的数据。• 参数解析: • sendbuf: 每个进程提供的输入数据地址。• recvbuf: 存放规约结果的缓冲区地址。• count: 参与规约的数据元素个数。• dtype: 数据类型(如MPI_INT,MPI_DOUBLE)。• op: 规约操作。MPI预定义了多种操作,如MPI_SUM(求和)、MPI_MAX(求最大值)、MPI_MIN(求最小值)等。• comm: 通信域,定义了参与此次集体通信的进程组。• 底层调用: MPI_Allreduce。函数中的getMpiDtype(dtype)和getMpiOp(op)很可能是将自定义的类型和操作枚举转换为标准MPI类型的辅助函数。MPICHECK宏用于检查MPI调用是否成功,通常包含错误处理逻辑。• 应用场景:在分布式计算中非常常用,例如在分布式深度学习的梯度同步阶段,所有工作节点计算完本地梯度后,使用 AllReduce来计算梯度的平均值,并确保所有节点更新为相同的参数。
🗣️ 全局收集函数 allgather
这个函数封装了 MPI_Allgather 操作。
void allgather(const void* sendbuf, void* recvbuf, int count, MpiType dtype, MpiComm comm)• 功能:该函数执行一个全局收集操作。每个进程将自已发送缓冲区( sendbuf)中的数据块发送出去,同时也会接收所有其他进程(包括自身)的数据块。最终,每个进程的接收缓冲区(recvbuf)中都会包含一份来自所有进程数据的完整集合,这些数据按照进程的排名(rank)顺序排列。可以理解为每个进程都作为根进程执行了一次MPI_Gather。• 参数解析: • sendbuf: 当前进程要发送的数据地址。• recvbuf: 接收所有进程数据的缓冲区地址。这个缓冲区必须足够大,至少能容纳count * N个元素(N是通信域内的进程总数)。• count: 每个进程发送的数据元素个数。• dtype: 数据类型。• comm: 通信域。• 底层调用: MPI_Allgather。注意参数中,发送和接收的数据类型被设置为相同的getMpiDtype(dtype),并且发送和接收的数据量都是count。• 应用场景:当需要将分布在各个进程上的数据片段整合成一个完整的数据集,并且每个进程都需要一份完整的副本时,就可以使用此函数。例如,在并行计算中,每个进程处理一部分数据后,需要将全部结果汇总到每个进程进行后续分析。
💡 关键要点与注意事项
1. 集体通信与同步: allreduce和allgather都是集体通信操作。这意味着通信域comm内的所有进程都必须调用相同的函数。如果有一个进程没有调用,程序就可能会死锁。2. 参数一致性:除了所有进程都必须调用外,传递给这些函数的 count,dtype等关键参数必须是“相容的”或一致的。例如,在allgather中,一个进程发送的count必须与其他进程接收的count相匹配。3. 性能考量:MPI实现内部会使用优化的算法(如蝶形连接、递归倍增、Ring算法等)来高效完成这些全局操作,其效率远高于用户自己用点对点通信实现相同功能。 4. 封装的价值:如代码所示,通过 MPICHECK和类型转换函数进行封装,增强了代码的安全性和错误处理能力,使上层应用代码更加简洁和健壮。
参考文献
• https://github.com/NVIDIA/TensorRT-LLM/blob/release/0.5.0/cpp/tensorrt_llm/common/mpiUtils.h

夜雨聆风