乐于分享
好东西不私藏

ROS2 底层源码解析与阅读指导

ROS2 底层源码解析与阅读指导

ROS2 底层源码解析与阅读指导

深入剖析 ROS2 核心层(rcl/rmw/rclcpp/rclpy)的设计与实现

源码版本:ROS2 Humble(2022 LTS)仓库地址:https://github.com/ros2

ROS2 底层源码解析与阅读指导一、ROS2 源码整体架构1.1 分层架构概览1.2 核心仓库索引1.3 推荐阅读顺序二、rcl:C 语言核心层2.1 目录结构2.2 rcl_init() — 全局初始化2.3 rcl_node_init() — 节点创建2.4 rcl Publisher — 发布者实现2.5 rcl_wait() — 等待集机制(核心)三、rclcpp:C++ 客户端库3.1 目录结构3.2 Node 类初始化流程3.3 create_publisher() 模板展开3.4 Executor 执行模型(最关键)3.5 CallbackGroup — 并发控制3.6 IntraProcess 通信(零拷贝)四、rmw:中间件抽象接口4.1 RMW 接口设计4.2 运行时 RMW 选择机制五、rosidl:消息/接口系统5.1 消息生成流程5.2 类型支持句柄六、rclpy:Python 客户端库6.1 Python → C 的绑定架构6.2 rclpy.init() 与 Node 创建6.3 rclpy 执行器七、源码阅读路线图7.1 分层阅读策略7.2 关键问题导航7.3 调试辅助工具八、重要设计模式总结8.1 PIMPL(Pointer to Implementation)8.2 类型擦除(Type Erasure)8.3 等待/通知机制


一、ROS2 源码整体架构

1.1 分层架构概览

1.2 核心仓库索引

1.3 推荐阅读顺序

第一阶段:理解数据流(1-2天)  rcutils → rmw → rcl → rclcpp (Node/Publisher/Subscription)第二阶段:理解通信原理(1天)  rmw_fastrtps → Fast-DDS 适配层第三阶段:理解 Python 绑定(半天)  rclpy → rclpy/impl第四阶段:理解消息系统(半天)  rosidl → rosidl_typesupport → 自动生成代码第五阶段:理解执行模型(1天)  rclcpp/executor → rclcpp/node_options → rclcpp/context

二、rcl:C 语言核心层

2.1 目录结构

2.2 rcl_init() — 全局初始化

// rcl/rcl/src/rcl/init.crcl_ret_trcl_init(intargc,charconst*const*argv,constrcl_init_options_t*options,rcl_context_t*context){// ① 参数校验RCUTILS_CHECK_ARGUMENT_FOR_NULL(optionsRCL_RET_INVALID_ARGUMENT);RCUTILS_CHECK_ARGUMENT_FOR_NULL(contextRCL_RET_INVALID_ARGUMENT);// ② 为 context 分配内部实现结构//    context 是整个 ROS2 实例的全局状态容器//    包含:shutdown 标志、实例 ID、DDS Domain ID、全局参数等context->impl=rcl_get_zero_initialized_context().impl;context->impl= (rcl_context_impl_t*)rmw_allocate(sizeof(rcl_context_impl_t));// ③ 解析命令行参数//    --ros-args 后的参数会被 ROS2 解析,其余传给应用程序//    支持:--remap、--param、--log-level 等ret=rcl_parse_arguments(argcargvallocator&context->global_arguments);// ④ 初始化日志系统(rcutils logging)//    设置日志级别(--ros-args --log-level DEBUG)//    初始化输出处理器(控制台/文件)ret=rcl_logging_configure_with_output_handler(&context->global_arguments,&allocator,rcl_logging_ret_handler);// ⑤ 初始化 RMW 层(调用实际 DDS 实现的 init)//    rmw_init() 创建 DDS DomainParticipant//    DomainParticipant 是 DDS 通信的根对象rmw_init_options_t*init_options=rcl_init_options_get_rmw_init_options(options);ret=rmw_init(init_options&(context->impl->rmw_context));// ⑥ 标记 context 为有效(valid)//    rclcpp::init() 就是对此函数的封装context->instance_id=next_instance_id++;context->impl->is_shutdown=false;returnRCL_RET_OK;}

关键点rcl_context_t 是整个 ROS2 实例的全局状态,每次 rclcpp::init() 创建一个 context,对应底层一个 DDS DomainParticipant

2.3 rcl_node_init() — 节点创建

// rcl/rcl/src/rcl/node.crcl_ret_trcl_node_init(rcl_node_t*node,constchar*name,            // 节点名,如 "my_node"constchar*namespace_,      // 命名空间,如 "/my_ns"rcl_context_t*context,constrcl_node_options_t*options){// ① 验证节点名合法性(只允许字母数字下划线,不能以数字开头)ret=rcl_validate_node_name(name&validation_result&invalid_index);// ② 构造完整节点名(namespace + "/" + name)//    并处理命名空间规范化(确保以 "/" 开头)node->impl->fq_name=rcutils_join_path(namespace_nameallocator);// ③ 在 RMW 层创建节点(DDS 级别的 Publisher/Subscriber 容器)//    Fast-DDS 实现中,这里创建了一个 DomainParticipant 的子实体node->impl->rmw_node_handle=rmw_create_node(&(context->impl->rmw_context),name,namespace_);// ④ 初始化图变化通知机制(guard_condition)//    用于在话题图变化时唤醒等待的线程node->impl->graph_guard_condition=rmw_create_guard_condition(&(context->impl->rmw_context));// ⑤ 初始化参数系统(Parameter Server)//    ROS2 中每个节点有自己独立的参数服务,不是全局的//    基于 rcl_service(服务通信)实现if (options->use_global_arguments) {// 解析通过 --ros-args --param key:=value 传入的参数ret=rcl_arguments_get_param_overrides(&options->arguments,&node->impl->parameter_overrides);  }returnRCL_RET_OK;}

2.4 rcl Publisher — 发布者实现

// rcl/rcl/src/rcl/publisher.crcl_ret_trcl_publisher_init(rcl_publisher_t*publisher,constrcl_node_t*node,constrosidl_message_type_support_t*type_support,  // 消息类型支持(自动生成)constchar*topic_name,constrcl_publisher_options_t*options)             // QoS 等选项{// ① 解析话题名(处理相对/绝对/私有命名空间)//    "scan"       → "/namespace/scan"(相对)//    "/scan"      → "/scan"(绝对)//    "~/scan"     → "/namespace/node_name/scan"(私有)char*remapped_topic_name=NULL;ret=rcl_remap_topic_name(&node->impl->options.arguments,&node->context->global_arguments,topic_name,rcl_node_get_name(node),rcl_node_get_namespace(node),allocator,&remapped_topic_name);// ② 在 RMW 层创建发布者//    Fast-DDS 实现:创建 DDS DataWriter//    DataWriter 绑定到 DDS Publisher(与 DomainParticipant 关联)publisher->impl->rmw_handle=rmw_create_publisher(rcl_node_get_rmw_handle(node),type_support,remapped_topic_name,&options->qos,           // QoS 设置(Reliability、Durability 等)&publisher_options);returnRCL_RET_OK;}rcl_ret_trcl_publish(constrcl_publisher_t*publisher,constvoid*ros_message,           // 指向消息结构体的指针rmw_publisher_allocation_t*allocation){// 直接调用 RMW 层发布// RMW 负责:序列化消息(CDR格式) → 交给 DDS DataWriter → 通过网络发送returnrmw_publish(publisher->impl->rmw_handleros_messageallocation);}

2.5 rcl_wait() — 等待集机制(核心)

rcl_wait() 是 ROS2 执行器的心脏,所有回调都通过它触发:

// rcl/rcl/src/rcl/wait.crcl_ret_trcl_wait(rcl_wait_set_t*wait_set,int64_ttimeout)              // 超时(纳秒),-1 = 无限等待,0 = 非阻塞{// wait_set 是一个容器,包含://   - subscriptions[]    ← 等待接收消息的订阅者//   - guard_conditions[] ← 等待被唤醒的条件变量//   - timers[]           ← 等待超时的定时器//   - clients[]          ← 等待服务响应的客户端//   - services[]         ← 等待服务请求的服务端//   - events[]           ← 等待 QoS 事件(deadline 违反等)// ① 将所有等待实体提交给 RMW 层//    RMW 将其转换为底层 DDS WaitSetret=rmw_wait(wait_set->impl->rmw_subscriptions,wait_set->impl->rmw_guard_conditions,wait_set->impl->rmw_services,wait_set->impl->rmw_clients,wait_set->impl->rmw_events,wait_set->impl->rmw_wait_set,timeout_time);// ② rmw_wait() 返回后://    DDS 已将有数据的实体标记出来//    NULL 表示无数据,非 NULL 表示有待处理数据// ③ 检查并标记定时器是否到期for (size_ti=0i<wait_set->size_of_timers++i) {if (wait_set->timers[i&&rcl_timer_is_ready(wait_set->timers[i])) {// 定时器就绪,保留在 wait_set 中    } else {wait_set->timers[i=NULL;  // 未就绪,清空    }  }// ④ 返回后,调用方(Executor)遍历 wait_set//    非 NULL 的实体表示有待处理的数据/事件//    Executor 逐一调用对应的回调函数returnRCL_RET_OK;}

三、rclcpp:C++ 客户端库

3.1 目录结构

3.2 Node 类初始化流程

// rclcpp/rclcpp/src/rclcpp/node.cppNode::Node(  const std::string & node_name,  const NodeOptions & options): Node(node_name, "", options){}Node::Node(  const std::string & node_name,  const std::string & namespace_,  const NodeOptions & options): node_base_(new rclcpp::node_interfaces::NodeBase(    node_name, namespace_,     options.context(),        // 使用哪个 Context(默认全局)    *(options.get_rcl_node_options()),    options.use_intra_process_comms(),    options.enable_topic_statistics())),  node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())),  node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())),  node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())),  node_topics_(new rclcpp::node_interfaces::NodeTopics(    node_base_.get(), node_timers_.get())),  node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())),  node_clock_(new rclcpp::node_interfaces::NodeClock(    node_base_.get(), node_topics_.get(),    node_graph_.get(), node_services_.get(),    node_logging_.get())),  node_parameters_(new rclcpp::node_interfaces::NodeParameters(    node_base_.get(), node_logging_.get(),    node_topics_.get(), node_services_.get(),    node_clock_.get(),    options.parameter_overrides(),    options.start_parameter_services(),    options.start_parameter_event_publisher(),    options.parameter_event_qos(),    options.parameter_event_publisher_options(),    options.allow_undeclared_parameters(),    options.automatically_declare_parameters_from_overrides())),  node_time_source_(...){  // Node 使用接口分离(Interface Segregation)设计模式  // 每个 node_xxx_ 对应一个功能接口:  //   node_base_:       rcl_node_t 生命周期管理  //   node_graph_:      话题图查询(get_topic_names_and_types)  //   node_logging_:    日志(RCLCPP_INFO 等宏使用)  //   node_timers_:     定时器管理  //   node_topics_:     Publisher/Subscription 管理  //   node_services_:   Service/Client 管理  //   node_parameters_: 参数服务(declare/get/set parameter)  //   node_clock_:      时钟(Wall/Steady/ROS)}

3.3 create_publisher() 模板展开

// rclcpp/rclcpp/include/rclcpp/node_impl.hpptemplate<typename MessageT, typename AllocatorT, typename PublisherT>std::shared_ptr<PublisherT>Node::create_publisher(  const std::string & topic_name,  const rclcpp::QoS & qos,  const PublisherOptionsWithAllocator<AllocatorT> & options){  return rclcpp::create_publisher<MessageT, AllocatorT, PublisherT>(    *this,           // Node 引用(提供命名空间、context 等)    topic_name,    qos,    options);}// rclcpp/rclcpp/include/rclcpp/create_publisher.hpptemplate<typename MessageT, ...>std::shared_ptr<PublisherT>create_publisher(  NodeT & node,  const std::string & topic_name,  const rclcpp::QoS & qos,  ...){  // ① 获取消息类型支持(由 rosidl 自动生成)  //    类型支持包含:序列化/反序列化函数、类型名称、MD5 等  auto type_support = rclcpp::get_message_type_support_handle<MessageT>();  // ② 获取进程内通信管理器(IntraProcess)  //    如果启用了进程内通信,同进程内的 Pub/Sub 可以零拷贝通信  auto ipm = node.get_node_base_interface()                  ->get_context()                  ->impl_->ipm_weak_ptr_.lock();  // ③ 创建 rcl publisher(调用 rcl_publisher_init)  auto publisher = std::make_shared<PublisherT>(    node.get_node_base_interface().get(),    type_support,    topic_name,    qos_profile,    // rmw_qos_profile_t(从 rclcpp::QoS 转换)    options);  // ④ 如果启用进程内通信,向 IntraProcessManager 注册  if (ipm && options.use_intra_process_comms) {    publisher->setup_intra_process(ipm, ...);  }  return publisher;}

3.4 Executor 执行模型(最关键)

Executor 是 ROS2 中最核心但也最容易误解的组件:

// rclcpp/rclcpp/src/rclcpp/executors/single_threaded_executor.cppvoidSingleThreadedExecutor::spin(){  // 检查是否已有 spin 在运行(防止重入)  if (spinning.exchange(true)) {    throw std::runtime_error("spin() called while already spinning");  }  RCLCPP_SCOPE_EXIT(this->spinning.store(false););  // 主循环  while (rclcpp::ok(this->context_) && spinning.load()) {    rclcpp::AnyExecutable any_executable;    // ① 尝试获取一个可执行的任务(非阻塞)    if (get_next_executable(any_executable, std::chrono::nanoseconds(-1))) {      // ② 执行任务(调用实际的回调函数)      execute_any_executable(any_executable);    }  }}// rclcpp/rclcpp/src/rclcpp/executor.cppboolExecutor::get_next_executable(  AnyExecutable & any_executable,  std::chrono::nanoseconds timeout){  bool success = false;  // ① 从已就绪队列中取(优先处理上次 wait 发现的就绪实体)  success = get_next_ready_executable(any_executable);  if (!success) {    // ② 队列为空,调用 wait_for_work 等待新数据    //    内部调用 rcl_wait(),最终调用 DDS WaitSet::wait()    wait_for_work(timeout);    // ③ wait 返回后,再次尝试获取就绪实体    success = get_next_ready_executable(any_executable);  }  return success;}voidExecutor::wait_for_work(std::chrono::nanoseconds timeout){  // ① 收集所有需要等待的实体(订阅者、定时器、服务等)  memory_strategy_->collect_entities(entities_need_rebuild_);  // ② 将实体加入 rcl_wait_set  //    wait_set 相当于 select/epoll 的 fd_set  add_handles_to_wait_set();  // ③ 调用 rcl_wait()(阻塞直到有数据或超时)  rcl_ret_t status = rcl_wait(&wait_set_, timeout_ns);  // ④ 收集已就绪的实体加入 ready_executables_(就绪队列)  memory_strategy_->get_next_subscription(any_exec, weak_nodes_);  memory_strategy_->get_next_timer(any_exec, weak_nodes_);  // ...}voidExecutor::execute_any_executable(AnyExecutable & any_exec){  if (!any_exec.subscription && !any_exec.subscription_intra_process) {    if (any_exec.timer) {      // 执行定时器回调      execute_timer(any_exec.timer);      return;    }    if (any_exec.service) {      execute_service(any_exec.service);      return;    }    if (any_exec.client) {      execute_client(any_exec.client);      return;    }    if (any_exec.waitable) {      any_exec.waitable->execute(any_exec.data);      return;    }    return;  }  // 执行话题订阅回调  execute_subscription(any_exec.subscription);}voidExecutor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription){  // ① 从 DDS DataReader 取出数据(调用 rcl_take)  auto message = subscription->create_message();  rmw_message_info_t message_info;  auto ret = rcl_take(    subscription->get_subscription_handle().get(),    message.get(),    &message_info,    nullptr);  if (RCL_RET_OK == ret) {    // ② 调用用户的回调函数    subscription->handle_message(message, message_info);    // handle_message 最终调用用户在 create_subscription 时传入的 lambda/函数  }}

3.5 CallbackGroup — 并发控制

// rclcpp/rclcpp/include/rclcpp/callback_group.hpp// ROS2 的回调组类型:enum class CallbackGroupType{  // 互斥组:同一时刻只有一个回调在执行  // 用于:有共享状态、不是线程安全的回调  MutuallyExclusive,  // 可重入组:允许多个回调并发执行  // 用于:无状态或线程安全的回调(如纯计算节点)  Reentrant};// 用法示例:class MyNode : public rclcpp::Node {  MyNode() : Node("my_node") {    // 创建互斥回调组(默认类型)    cb_group_mutex_ = create_callback_group(      rclcpp::CallbackGroupType::MutuallyExclusive);    // 创建可重入回调组    cb_group_reentrant_ = create_callback_group(      rclcpp::CallbackGroupType::Reentrant);    // 将订阅者绑定到特定回调组    auto options = rclcpp::SubscriptionOptions();    options.callback_group = cb_group_mutex_;    sub_ = create_subscription<std_msgs::msg::String>(      "topic", 10,      std::bind(&MyNode::callback, this, _1),      options);  // ← 绑定到互斥组    // 与 MultiThreadedExecutor 配合使用才有意义    // SingleThreadedExecutor 中回调组无效(本来就单线程)  }};

3.6 IntraProcess 通信(零拷贝)

// rclcpp/rclcpp/include/rclcpp/intra_process_manager.hpp// 进程内通信(Intra-Process Communication)// 同一进程内的 Pub/Sub 直接共享内存,避免序列化/网络开销// 启用方式:auto options = rclcpp::NodeOptions()  .use_intra_process_comms(true);  // 为此节点启用进程内通信auto node = std::make_shared<MyNode>(options);// 内部原理:// 1. Publisher 发布消息时://    IntraProcessManager 检查是否有同进程的 Subscriber 订阅此话题//    如果有:将消息指针直接存入 ring buffer,不序列化//    如果无:走正常 DDS 路径// 2. 发布接口区分://    publish(msg)               → 拷贝语义(外部可继续使用 msg)//    publish(std::move(msg))    → 移动语义(零拷贝,msg 所有权转移)//    publish(std::unique_ptr)   → 零拷贝(推荐高性能场景)// 3. 类型约束://    进程内通信要求:Publisher 和 Subscriber 的 QoS 兼容//    且 Subscriber 必须使用 unique_ptr 回调形式

四、rmw:中间件抽象接口

4.1 RMW 接口设计

// rmw/rmw/include/rmw/rmw.h(接口层)// RMW 是纯 C 接口,定义了一套"虚函数表"// 每个 DDS 实现提供这套接口的具体实现// 发布者相关rmw_publisher_t * rmw_create_publisher(  const rmw_node_t * node,  const rosidl_message_type_support_t * type_support,  const char * topic_name,  const rmw_qos_profile_t * qos_profile,  const rmw_publisher_options_t * publisher_options);rmw_ret_t rmw_publish(  const rmw_publisher_t * publisher,  const void * ros_message,  rmw_publisher_allocation_t * allocation);// 订阅者相关rmw_subscription_t * rmw_create_subscription(...);rmw_ret_t rmw_take(  const rmw_subscription_t * subscription,  void * ros_message,  bool * taken,  rmw_subscription_allocation_t * allocation);// 等待集(核心!)rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t max_conditions);rmw_ret_t rmw_wait(  rmw_subscriptions_t * subscriptions,  rmw_guard_conditions_t * guard_conditions,  rmw_services_t * services,  rmw_clients_t * clients,  rmw_events_t * events,  rmw_wait_set_t * wait_set,  const rmw_time_t * wait_timeout);  // 超时

4.2 运行时 RMW 选择机制

# 查看已安装的 RMW 实现ros2 doctor --report | grep RMW# 切换 RMW 实现(环境变量)export RMW_IMPLEMENTATION=rmw_cyclonedds_cppros2 run demo_nodes_cpp talkerexport RMW_IMPLEMENTATION=rmw_fastrtps_cppros2 run demo_nodes_cpp listener
rmw_implementation/src/functions.cpp运行时动态加载 RMW 实现的机制1. 读取环境变量 RMW_IMPLEMENTATION2. 用 dlopen() 加载对应的共享库   rmw_fastrtps_cpp → librmw_fastrtps_cpp.so   rmw_cyclonedds_cpp → librmw_cyclonedds_cpp.so3. 用 dlsym() 获取各 rmw_* 函数的地址4. 将函数指针存入全局函数表5. 后续所有 rmw_* 调用通过函数表分发这就是为什么:同一份 rclcpp/rcl 代码可以在不同 DDS 实现上运行只需要在启动时设置环境变量即可

五、rosidl:消息/接口系统

5.1 消息生成流程

用户定义 → 代码生成 → 类型支持geometry_msgs/msg/Point.msg:  float64 x  float64 y  float64 z  ↓ rosidl_generator_cgeometry_msgs/msg/detail/point__struct.h:  typedef struct geometry_msgs__msg__Point {    double x;    double y;    double z;  } geometry_msgs__msg__Point;↓ rosidl_generator_cppgeometry_msgs/msg/point.hpp:  namespace geometry_msgs { namespace msg {    struct Point {      double x = 0.0;      double y = 0.0;      double z = 0.0;    };  }} ↓ rosidl_typesupport_fastrtps_cpp序列化/反序列化代码(CDR 格式):  void serialize(const Point & msg, eprosima::fastcdr::Cdr & ser) {    ser << msg.x;    ser << msg.y;    ser << msg.z;  }

5.2 类型支持句柄

// 消息类型支持是 rclcpp 与 rmw 之间的桥梁// 获取消息类型支持(模板实例化,编译期确定)#include "geometry_msgs/msg/point.hpp"#include "rosidl_runtime_cpp/message_type_support_decl.hpp"const rosidl_message_type_support_t * type_support =  rosidl_typesupport_cpp::get_message_type_support_handle<    geometry_msgs::msg::Point>();// type_support 结构体包含:// - type_name:          "geometry_msgs/msg/Point"// - typesupport_identifier: "rosidl_typesupport_fastrtps_cpp"// - func:              指向序列化/反序列化函数的指针//   - serialize():     ROS 结构体 → CDR 字节流//   - deserialize():   CDR 字节流 → ROS 结构体//   - get_serialized_size(): 计算序列化后的大小

六、rclpy:Python 客户端库

6.1 Python → C 的绑定架构

rclpy 的调用链:Python 代码  ↓ 纯 Python 封装(rclpy/node.py 等)  ↓ Python C 扩展(_rclpy.so)    ↓ PyO3/CFFI 调用 rcl C API      ↓ rcl(C 语言层)        ↓ rmw(C 中间件接口)          ↓ DDS 实现

6.2 rclpy.init() 与 Node 创建

# rclpy/rclpy/__init__.pydef init(*, args=None, context=None, domain_id=None):    """    Python 入口,最终调用 rcl_init()    """    context = context or get_default_context()    # 调用 C 扩展模块中的 rclpy_init    # _rclpy.rclpy_init() → rcl_init()    context._handle = _rclpy.rclpy_init(        args if args is not None else sys.argv,        domain_id)# rclpy/rclpy/node.py(核心类)class Node:    def __init__(        self,        node_name: str,        *,        context: Context = None,        cli_args: List[str] = None,        namespace: str = None,        use_global_arguments: bool = True,        enable_rosout: bool = True,        start_parameter_services: bool = True,        parameter_overrides: List[Parameter] = None,        allow_undeclared_parameters: bool = False,        automatically_declare_parameters_from_overrides: bool = False,        qos_overrides_options: QoSOverridesOptions = None,    ):        # 处理 CLI 参数(支持 --ros-args --remap 等)        node_cli_args = cli_args if cli_args is not None else []        # 调用 C 扩展创建 rcl_node_t        self.__handle = _rclpy.rclpy_create_node(            node_name,            namespace if namespace is not None else '',            context.handle,            node_cli_args,            use_global_arguments,            enable_rosout)        # 初始化参数服务        self.__parameters: Dict[str, Parameter] = {}        self._parameter_callbacks: List[Callable] = []        if start_parameter_services:            self.__parameter_service = ParameterService(self)            self.__parameter_event_publisher = self.__create_parameter_event_publisher()    def create_publisher(        self,        msg_type,        topic: str,        qos_profile: Union[QoSProfile, int],        *,        callback_group: CallbackGroup = None,        event_callbacks: PublisherEventCallbacks = None,        qos_overriding_options: QoSOverridingOptions = None,    ) -> Publisher:        # 解析 QoS(支持直接传 int 作为 queue_size)        qos_profile = self._validate_qos_or_depth_is_int(qos_profile)        # 创建 Publisher 对象        # 内部调用 _rclpy.rclpy_create_publisher() → rcl_publisher_init()        publisher = Publisher(self, msg_type, topic, qos_profile, event_callbacks)        # 注册到节点的发布者列表        self.__publishers.append(weakref.ref(publisher))        return publisher    def create_subscription(        self,        msg_type,        topic: str,        callback: Callable,        qos_profile: Union[QoSProfile, int],        *,        callback_group: CallbackGroup = None,        event_callbacks: SubscriptionEventCallbacks = None,    ) -> Subscription:        qos_profile = self._validate_qos_or_depth_is_int(qos_profile)        # Subscription 内部创建 rcl_subscription_t        # 并将 callback 注册到 executor 的等待队列        sub = Subscription(            self, msg_type, topic, callback, qos_profile,            raw=False, event_callbacks=event_callbacks)        self.__subscriptions.append(weakref.ref(sub))        # 通知执行器有新的等待实体(触发 guard_condition)        for executor in self.__executors:            executor.wake()        return sub

6.3 rclpy 执行器

# rclpy/rclpy/executors.pyclass SingleThreadedExecutor(Executor):    """    rclpy 默认执行器,等价于 rclcpp::SingleThreadedExecutor    """    def spin(self):        """阻塞运行,直到 rclpy.ok() 返回 False"""        while rclpy.ok(context=self._context):            self.spin_once()    def spin_once(self, timeout_sec: float = None):        """        处理一个待执行的回调(含超时等待)        """        timeout_sec = float(timeout_sec) if timeout_sec is not None else None        # 调用底层 spin_once_until_future_complete        # 或直接调用 _take_and_execute()        self._take_and_execute(timeout_sec=timeout_sec)    def _take_and_execute(self, timeout_sec=None):        """        从等待集取出一个就绪实体并执行其回调        内部调用 rcl_wait()        """        # ① 构建 wait_set(收集所有订阅者/定时器/服务等)        wait_set = self._context.get_wait_set()        # ② 调用 _rclpy.rclpy_wait() → rcl_wait()        _rclpy.rclpy_wait(wait_set, timeout_ns)        # ③ 收集就绪的实体        ready_entities = _rclpy.rclpy_get_ready_entities(wait_set)        # ④ 执行第一个就绪的回调        for entity in ready_entities:            if isinstance(entity, Subscription):                # 取出消息并调用用户回调                msg, info = entity.take()                if msg is not None:                    entity.callback(msg)                break            elif isinstance(entity, Timer):                entity.callback()                break

七、源码阅读路线图

7.1 分层阅读策略

7.2 关键问题导航

通过问题导向阅读,效率最高:

7.3 调试辅助工具

# 查看 rcl 层日志(需设置环境变量)export RCUTILS_LOGGING_BUFFERED_STREAM=1export RCUTILS_CONSOLE_OUTPUT_FORMAT="[{severity}] [{name}]: {message}"ros2 run demo_nodes_cpp talker# 查看 rmw 层调试信息(Fast-DDS)export FASTRTPS_DEFAULT_PROFILES_FILE=rmw_fastrtps_config.xml# 在 XML 中设置 <Log><Verbosity>Info</Verbosity></Log># 使用 ros2_tracing 追踪 rcl/rclcpp 函数调用# 需要安装 ros-humble-tracetools-launchros2 launch tracetools_launch trace.launch.pyros2 run demo_nodes_cpp talker &ros2 run demo_nodes_cpp listener &# 查看 LTTng 追踪结果# 使用 GDB 调试 C++ 节点ros2 run --prefix 'gdb -ex run --args' my_pkg my_node# 打印 wait_set 状态(添加日志到源码)RCLCPP_DEBUG(get_logger(), "wait_set has %zu subscriptions",             wait_set.size_of_subscriptions);

八、重要设计模式总结

8.1 PIMPL(Pointer to Implementation)

// ROS2 到处使用 PIMPL 模式,确保 ABI 稳定性:// node.hpp(公共头文件,用户可见)class Node {public:  Node(const std::string & name, ...);  Publisher<T>::SharedPtr create_publisher(...);  // 只暴露公共 API,不暴露内部成员private:  // 所有内部状态通过 impl 指针访问  std::shared_ptr<NodeBaseInterface> node_base_;  std::shared_ptr<NodeTopicsInterface> node_topics_;  // ...};// 优点:改变内部实现不影响 ABI,用户代码无需重新编译

8.2 类型擦除(Type Erasure)

// Executor 处理不同消息类型的回调,通过类型擦除统一管理:// AnyExecutable 包含各种可执行实体的 base 指针struct AnyExecutable {  rclcpp::SubscriptionBase::SharedPtr subscription;  rclcpp::TimerBase::SharedPtr timer;  rclcpp::ServiceBase::SharedPtr service;  rclcpp::ClientBase::SharedPtr client;  rclcpp::Waitable::SharedPtr waitable;  // 所有类型都继承自各自的 Base 类  // 执行时通过虚函数(handle_message/execute_callback)分发};

8.3 等待/通知机制

// GuardCondition 是 ROS2 中"异步通知"的基础原语// 类似 eventfd 或条件变量,用于唤醒阻塞在 rcl_wait() 中的线程// 使用场景:// 1. 新节点/订阅者加入时,通知 Executor 重建 wait_set// 2. shutdown() 调用时,立即唤醒所有等待的 Executor// 3. 话题图变化时,唤醒 graph_guard_conditionclass GuardCondition {public:  void trigger() {    // 调用 rmw_trigger_guard_condition()    // Fast-DDS 实现:写入一个 DDS DataWriter,    // wait_set 中等待的 DataReader 立即收到通知  }};