ROS2 底层源码解析与阅读指导
ROS2 底层源码解析与阅读指导
深入剖析 ROS2 核心层(rcl/rmw/rclcpp/rclpy)的设计与实现
源码版本:ROS2 Humble(2022 LTS)仓库地址:https://github.com/ros2
ROS2 底层源码解析与阅读指导一、ROS2 源码整体架构1.1 分层架构概览1.2 核心仓库索引1.3 推荐阅读顺序二、rcl:C 语言核心层2.1 目录结构2.2 rcl_init() — 全局初始化2.3 rcl_node_init() — 节点创建2.4 rcl Publisher — 发布者实现2.5 rcl_wait() — 等待集机制(核心)三、rclcpp:C++ 客户端库3.1 目录结构3.2 Node 类初始化流程3.3 create_publisher() 模板展开3.4 Executor 执行模型(最关键)3.5 CallbackGroup — 并发控制3.6 IntraProcess 通信(零拷贝)四、rmw:中间件抽象接口4.1 RMW 接口设计4.2 运行时 RMW 选择机制五、rosidl:消息/接口系统5.1 消息生成流程5.2 类型支持句柄六、rclpy:Python 客户端库6.1 Python → C 的绑定架构6.2 rclpy.init() 与 Node 创建6.3 rclpy 执行器七、源码阅读路线图7.1 分层阅读策略7.2 关键问题导航7.3 调试辅助工具八、重要设计模式总结8.1 PIMPL(Pointer to Implementation)8.2 类型擦除(Type Erasure)8.3 等待/通知机制
一、ROS2 源码整体架构
1.1 分层架构概览

1.2 核心仓库索引
1.3 推荐阅读顺序
第一阶段:理解数据流(1-2天) rcutils → rmw → rcl → rclcpp (Node/Publisher/Subscription)第二阶段:理解通信原理(1天) rmw_fastrtps → Fast-DDS 适配层第三阶段:理解 Python 绑定(半天) rclpy → rclpy/impl第四阶段:理解消息系统(半天) rosidl → rosidl_typesupport → 自动生成代码第五阶段:理解执行模型(1天) rclcpp/executor → rclcpp/node_options → rclcpp/context
二、rcl:C 语言核心层
2.1 目录结构

2.2 rcl_init() — 全局初始化
// rcl/rcl/src/rcl/init.crcl_ret_trcl_init(intargc,charconst*const*argv,constrcl_init_options_t*options,rcl_context_t*context){// ① 参数校验RCUTILS_CHECK_ARGUMENT_FOR_NULL(options, RCL_RET_INVALID_ARGUMENT);RCUTILS_CHECK_ARGUMENT_FOR_NULL(context, RCL_RET_INVALID_ARGUMENT);// ② 为 context 分配内部实现结构// context 是整个 ROS2 实例的全局状态容器// 包含:shutdown 标志、实例 ID、DDS Domain ID、全局参数等context->impl=rcl_get_zero_initialized_context().impl;context->impl= (rcl_context_impl_t*)rmw_allocate(sizeof(rcl_context_impl_t));// ③ 解析命令行参数// --ros-args 后的参数会被 ROS2 解析,其余传给应用程序// 支持:--remap、--param、--log-level 等ret=rcl_parse_arguments(argc, argv, allocator, &context->global_arguments);// ④ 初始化日志系统(rcutils logging)// 设置日志级别(--ros-args --log-level DEBUG)// 初始化输出处理器(控制台/文件)ret=rcl_logging_configure_with_output_handler(&context->global_arguments,&allocator,rcl_logging_ret_handler);// ⑤ 初始化 RMW 层(调用实际 DDS 实现的 init)// rmw_init() 创建 DDS DomainParticipant// DomainParticipant 是 DDS 通信的根对象rmw_init_options_t*init_options=rcl_init_options_get_rmw_init_options(options);ret=rmw_init(init_options, &(context->impl->rmw_context));// ⑥ 标记 context 为有效(valid)// rclcpp::init() 就是对此函数的封装context->instance_id=next_instance_id++;context->impl->is_shutdown=false;returnRCL_RET_OK;}
关键点:rcl_context_t 是整个 ROS2 实例的全局状态,每次 rclcpp::init() 创建一个 context,对应底层一个 DDS DomainParticipant。
2.3 rcl_node_init() — 节点创建
// rcl/rcl/src/rcl/node.crcl_ret_trcl_node_init(rcl_node_t*node,constchar*name, // 节点名,如 "my_node"constchar*namespace_, // 命名空间,如 "/my_ns"rcl_context_t*context,constrcl_node_options_t*options){// ① 验证节点名合法性(只允许字母数字下划线,不能以数字开头)ret=rcl_validate_node_name(name, &validation_result, &invalid_index);// ② 构造完整节点名(namespace + "/" + name)// 并处理命名空间规范化(确保以 "/" 开头)node->impl->fq_name=rcutils_join_path(namespace_, name, allocator);// ③ 在 RMW 层创建节点(DDS 级别的 Publisher/Subscriber 容器)// Fast-DDS 实现中,这里创建了一个 DomainParticipant 的子实体node->impl->rmw_node_handle=rmw_create_node(&(context->impl->rmw_context),name,namespace_);// ④ 初始化图变化通知机制(guard_condition)// 用于在话题图变化时唤醒等待的线程node->impl->graph_guard_condition=rmw_create_guard_condition(&(context->impl->rmw_context));// ⑤ 初始化参数系统(Parameter Server)// ROS2 中每个节点有自己独立的参数服务,不是全局的// 基于 rcl_service(服务通信)实现if (options->use_global_arguments) {// 解析通过 --ros-args --param key:=value 传入的参数ret=rcl_arguments_get_param_overrides(&options->arguments,&node->impl->parameter_overrides); }returnRCL_RET_OK;}
2.4 rcl Publisher — 发布者实现
// rcl/rcl/src/rcl/publisher.crcl_ret_trcl_publisher_init(rcl_publisher_t*publisher,constrcl_node_t*node,constrosidl_message_type_support_t*type_support, // 消息类型支持(自动生成)constchar*topic_name,constrcl_publisher_options_t*options) // QoS 等选项{// ① 解析话题名(处理相对/绝对/私有命名空间)// "scan" → "/namespace/scan"(相对)// "/scan" → "/scan"(绝对)// "~/scan" → "/namespace/node_name/scan"(私有)char*remapped_topic_name=NULL;ret=rcl_remap_topic_name(&node->impl->options.arguments,&node->context->global_arguments,topic_name,rcl_node_get_name(node),rcl_node_get_namespace(node),allocator,&remapped_topic_name);// ② 在 RMW 层创建发布者// Fast-DDS 实现:创建 DDS DataWriter// DataWriter 绑定到 DDS Publisher(与 DomainParticipant 关联)publisher->impl->rmw_handle=rmw_create_publisher(rcl_node_get_rmw_handle(node),type_support,remapped_topic_name,&options->qos, // QoS 设置(Reliability、Durability 等)&publisher_options);returnRCL_RET_OK;}rcl_ret_trcl_publish(constrcl_publisher_t*publisher,constvoid*ros_message, // 指向消息结构体的指针rmw_publisher_allocation_t*allocation){// 直接调用 RMW 层发布// RMW 负责:序列化消息(CDR格式) → 交给 DDS DataWriter → 通过网络发送returnrmw_publish(publisher->impl->rmw_handle, ros_message, allocation);}
2.5 rcl_wait() — 等待集机制(核心)
rcl_wait() 是 ROS2 执行器的心脏,所有回调都通过它触发:
// rcl/rcl/src/rcl/wait.crcl_ret_trcl_wait(rcl_wait_set_t*wait_set,int64_ttimeout) // 超时(纳秒),-1 = 无限等待,0 = 非阻塞{// wait_set 是一个容器,包含:// - subscriptions[] ← 等待接收消息的订阅者// - guard_conditions[] ← 等待被唤醒的条件变量// - timers[] ← 等待超时的定时器// - clients[] ← 等待服务响应的客户端// - services[] ← 等待服务请求的服务端// - events[] ← 等待 QoS 事件(deadline 违反等)// ① 将所有等待实体提交给 RMW 层// RMW 将其转换为底层 DDS WaitSetret=rmw_wait(wait_set->impl->rmw_subscriptions,wait_set->impl->rmw_guard_conditions,wait_set->impl->rmw_services,wait_set->impl->rmw_clients,wait_set->impl->rmw_events,wait_set->impl->rmw_wait_set,timeout_time);// ② rmw_wait() 返回后:// DDS 已将有数据的实体标记出来// NULL 表示无数据,非 NULL 表示有待处理数据// ③ 检查并标记定时器是否到期for (size_ti=0; i<wait_set->size_of_timers; ++i) {if (wait_set->timers[i] &&rcl_timer_is_ready(wait_set->timers[i])) {// 定时器就绪,保留在 wait_set 中 } else {wait_set->timers[i] =NULL; // 未就绪,清空 } }// ④ 返回后,调用方(Executor)遍历 wait_set// 非 NULL 的实体表示有待处理的数据/事件// Executor 逐一调用对应的回调函数returnRCL_RET_OK;}
三、rclcpp:C++ 客户端库
3.1 目录结构

3.2 Node 类初始化流程
// rclcpp/rclcpp/src/rclcpp/node.cppNode::Node( const std::string & node_name, const NodeOptions & options): Node(node_name, "", options){}Node::Node( const std::string & node_name, const std::string & namespace_, const NodeOptions & options): node_base_(new rclcpp::node_interfaces::NodeBase( node_name, namespace_, options.context(), // 使用哪个 Context(默认全局) *(options.get_rcl_node_options()), options.use_intra_process_comms(), options.enable_topic_statistics())), node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())), node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())), node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())), node_topics_(new rclcpp::node_interfaces::NodeTopics( node_base_.get(), node_timers_.get())), node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())), node_clock_(new rclcpp::node_interfaces::NodeClock( node_base_.get(), node_topics_.get(), node_graph_.get(), node_services_.get(), node_logging_.get())), node_parameters_(new rclcpp::node_interfaces::NodeParameters( node_base_.get(), node_logging_.get(), node_topics_.get(), node_services_.get(), node_clock_.get(), options.parameter_overrides(), options.start_parameter_services(), options.start_parameter_event_publisher(), options.parameter_event_qos(), options.parameter_event_publisher_options(), options.allow_undeclared_parameters(), options.automatically_declare_parameters_from_overrides())), node_time_source_(...){ // Node 使用接口分离(Interface Segregation)设计模式 // 每个 node_xxx_ 对应一个功能接口: // node_base_: rcl_node_t 生命周期管理 // node_graph_: 话题图查询(get_topic_names_and_types) // node_logging_: 日志(RCLCPP_INFO 等宏使用) // node_timers_: 定时器管理 // node_topics_: Publisher/Subscription 管理 // node_services_: Service/Client 管理 // node_parameters_: 参数服务(declare/get/set parameter) // node_clock_: 时钟(Wall/Steady/ROS)}
3.3 create_publisher() 模板展开
// rclcpp/rclcpp/include/rclcpp/node_impl.hpptemplate<typename MessageT, typename AllocatorT, typename PublisherT>std::shared_ptr<PublisherT>Node::create_publisher( const std::string & topic_name, const rclcpp::QoS & qos, const PublisherOptionsWithAllocator<AllocatorT> & options){ return rclcpp::create_publisher<MessageT, AllocatorT, PublisherT>( *this, // Node 引用(提供命名空间、context 等) topic_name, qos, options);}// rclcpp/rclcpp/include/rclcpp/create_publisher.hpptemplate<typename MessageT, ...>std::shared_ptr<PublisherT>create_publisher( NodeT & node, const std::string & topic_name, const rclcpp::QoS & qos, ...){ // ① 获取消息类型支持(由 rosidl 自动生成) // 类型支持包含:序列化/反序列化函数、类型名称、MD5 等 auto type_support = rclcpp::get_message_type_support_handle<MessageT>(); // ② 获取进程内通信管理器(IntraProcess) // 如果启用了进程内通信,同进程内的 Pub/Sub 可以零拷贝通信 auto ipm = node.get_node_base_interface() ->get_context() ->impl_->ipm_weak_ptr_.lock(); // ③ 创建 rcl publisher(调用 rcl_publisher_init) auto publisher = std::make_shared<PublisherT>( node.get_node_base_interface().get(), type_support, topic_name, qos_profile, // rmw_qos_profile_t(从 rclcpp::QoS 转换) options); // ④ 如果启用进程内通信,向 IntraProcessManager 注册 if (ipm && options.use_intra_process_comms) { publisher->setup_intra_process(ipm, ...); } return publisher;}
3.4 Executor 执行模型(最关键)
Executor 是 ROS2 中最核心但也最容易误解的组件:
// rclcpp/rclcpp/src/rclcpp/executors/single_threaded_executor.cppvoidSingleThreadedExecutor::spin(){ // 检查是否已有 spin 在运行(防止重入) if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } RCLCPP_SCOPE_EXIT(this->spinning.store(false);); // 主循环 while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_executable; // ① 尝试获取一个可执行的任务(非阻塞) if (get_next_executable(any_executable, std::chrono::nanoseconds(-1))) { // ② 执行任务(调用实际的回调函数) execute_any_executable(any_executable); } }}// rclcpp/rclcpp/src/rclcpp/executor.cppboolExecutor::get_next_executable( AnyExecutable & any_executable, std::chrono::nanoseconds timeout){ bool success = false; // ① 从已就绪队列中取(优先处理上次 wait 发现的就绪实体) success = get_next_ready_executable(any_executable); if (!success) { // ② 队列为空,调用 wait_for_work 等待新数据 // 内部调用 rcl_wait(),最终调用 DDS WaitSet::wait() wait_for_work(timeout); // ③ wait 返回后,再次尝试获取就绪实体 success = get_next_ready_executable(any_executable); } return success;}voidExecutor::wait_for_work(std::chrono::nanoseconds timeout){ // ① 收集所有需要等待的实体(订阅者、定时器、服务等) memory_strategy_->collect_entities(entities_need_rebuild_); // ② 将实体加入 rcl_wait_set // wait_set 相当于 select/epoll 的 fd_set add_handles_to_wait_set(); // ③ 调用 rcl_wait()(阻塞直到有数据或超时) rcl_ret_t status = rcl_wait(&wait_set_, timeout_ns); // ④ 收集已就绪的实体加入 ready_executables_(就绪队列) memory_strategy_->get_next_subscription(any_exec, weak_nodes_); memory_strategy_->get_next_timer(any_exec, weak_nodes_); // ...}voidExecutor::execute_any_executable(AnyExecutable & any_exec){ if (!any_exec.subscription && !any_exec.subscription_intra_process) { if (any_exec.timer) { // 执行定时器回调 execute_timer(any_exec.timer); return; } if (any_exec.service) { execute_service(any_exec.service); return; } if (any_exec.client) { execute_client(any_exec.client); return; } if (any_exec.waitable) { any_exec.waitable->execute(any_exec.data); return; } return; } // 执行话题订阅回调 execute_subscription(any_exec.subscription);}voidExecutor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription){ // ① 从 DDS DataReader 取出数据(调用 rcl_take) auto message = subscription->create_message(); rmw_message_info_t message_info; auto ret = rcl_take( subscription->get_subscription_handle().get(), message.get(), &message_info, nullptr); if (RCL_RET_OK == ret) { // ② 调用用户的回调函数 subscription->handle_message(message, message_info); // handle_message 最终调用用户在 create_subscription 时传入的 lambda/函数 }}
3.5 CallbackGroup — 并发控制
// rclcpp/rclcpp/include/rclcpp/callback_group.hpp// ROS2 的回调组类型:enum class CallbackGroupType{ // 互斥组:同一时刻只有一个回调在执行 // 用于:有共享状态、不是线程安全的回调 MutuallyExclusive, // 可重入组:允许多个回调并发执行 // 用于:无状态或线程安全的回调(如纯计算节点) Reentrant};// 用法示例:class MyNode : public rclcpp::Node { MyNode() : Node("my_node") { // 创建互斥回调组(默认类型) cb_group_mutex_ = create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive); // 创建可重入回调组 cb_group_reentrant_ = create_callback_group( rclcpp::CallbackGroupType::Reentrant); // 将订阅者绑定到特定回调组 auto options = rclcpp::SubscriptionOptions(); options.callback_group = cb_group_mutex_; sub_ = create_subscription<std_msgs::msg::String>( "topic", 10, std::bind(&MyNode::callback, this, _1), options); // ← 绑定到互斥组 // 与 MultiThreadedExecutor 配合使用才有意义 // SingleThreadedExecutor 中回调组无效(本来就单线程) }};
3.6 IntraProcess 通信(零拷贝)
// rclcpp/rclcpp/include/rclcpp/intra_process_manager.hpp// 进程内通信(Intra-Process Communication)// 同一进程内的 Pub/Sub 直接共享内存,避免序列化/网络开销// 启用方式:auto options = rclcpp::NodeOptions() .use_intra_process_comms(true); // 为此节点启用进程内通信auto node = std::make_shared<MyNode>(options);// 内部原理:// 1. Publisher 发布消息时:// IntraProcessManager 检查是否有同进程的 Subscriber 订阅此话题// 如果有:将消息指针直接存入 ring buffer,不序列化// 如果无:走正常 DDS 路径// 2. 发布接口区分:// publish(msg) → 拷贝语义(外部可继续使用 msg)// publish(std::move(msg)) → 移动语义(零拷贝,msg 所有权转移)// publish(std::unique_ptr) → 零拷贝(推荐高性能场景)// 3. 类型约束:// 进程内通信要求:Publisher 和 Subscriber 的 QoS 兼容// 且 Subscriber 必须使用 unique_ptr 回调形式
四、rmw:中间件抽象接口
4.1 RMW 接口设计
// rmw/rmw/include/rmw/rmw.h(接口层)// RMW 是纯 C 接口,定义了一套"虚函数表"// 每个 DDS 实现提供这套接口的具体实现// 发布者相关rmw_publisher_t * rmw_create_publisher( const rmw_node_t * node, const rosidl_message_type_support_t * type_support, const char * topic_name, const rmw_qos_profile_t * qos_profile, const rmw_publisher_options_t * publisher_options);rmw_ret_t rmw_publish( const rmw_publisher_t * publisher, const void * ros_message, rmw_publisher_allocation_t * allocation);// 订阅者相关rmw_subscription_t * rmw_create_subscription(...);rmw_ret_t rmw_take( const rmw_subscription_t * subscription, void * ros_message, bool * taken, rmw_subscription_allocation_t * allocation);// 等待集(核心!)rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t max_conditions);rmw_ret_t rmw_wait( rmw_subscriptions_t * subscriptions, rmw_guard_conditions_t * guard_conditions, rmw_services_t * services, rmw_clients_t * clients, rmw_events_t * events, rmw_wait_set_t * wait_set, const rmw_time_t * wait_timeout); // 超时
4.2 运行时 RMW 选择机制
# 查看已安装的 RMW 实现ros2 doctor --report | grep RMW# 切换 RMW 实现(环境变量)export RMW_IMPLEMENTATION=rmw_cyclonedds_cppros2 run demo_nodes_cpp talkerexport RMW_IMPLEMENTATION=rmw_fastrtps_cppros2 run demo_nodes_cpp listener
rmw_implementation/src/functions.cpp运行时动态加载 RMW 实现的机制1. 读取环境变量 RMW_IMPLEMENTATION2. 用 dlopen() 加载对应的共享库 rmw_fastrtps_cpp → librmw_fastrtps_cpp.so rmw_cyclonedds_cpp → librmw_cyclonedds_cpp.so3. 用 dlsym() 获取各 rmw_* 函数的地址4. 将函数指针存入全局函数表5. 后续所有 rmw_* 调用通过函数表分发这就是为什么:同一份 rclcpp/rcl 代码可以在不同 DDS 实现上运行只需要在启动时设置环境变量即可
五、rosidl:消息/接口系统
5.1 消息生成流程
用户定义 → 代码生成 → 类型支持geometry_msgs/msg/Point.msg: float64 x float64 y float64 z ↓ rosidl_generator_cgeometry_msgs/msg/detail/point__struct.h: typedef struct geometry_msgs__msg__Point { double x; double y; double z; } geometry_msgs__msg__Point;↓ rosidl_generator_cppgeometry_msgs/msg/point.hpp: namespace geometry_msgs { namespace msg { struct Point { double x = 0.0; double y = 0.0; double z = 0.0; }; }} ↓ rosidl_typesupport_fastrtps_cpp序列化/反序列化代码(CDR 格式): void serialize(const Point & msg, eprosima::fastcdr::Cdr & ser) { ser << msg.x; ser << msg.y; ser << msg.z; }
5.2 类型支持句柄
// 消息类型支持是 rclcpp 与 rmw 之间的桥梁// 获取消息类型支持(模板实例化,编译期确定)#include "geometry_msgs/msg/point.hpp"#include "rosidl_runtime_cpp/message_type_support_decl.hpp"const rosidl_message_type_support_t * type_support = rosidl_typesupport_cpp::get_message_type_support_handle< geometry_msgs::msg::Point>();// type_support 结构体包含:// - type_name: "geometry_msgs/msg/Point"// - typesupport_identifier: "rosidl_typesupport_fastrtps_cpp"// - func: 指向序列化/反序列化函数的指针// - serialize(): ROS 结构体 → CDR 字节流// - deserialize(): CDR 字节流 → ROS 结构体// - get_serialized_size(): 计算序列化后的大小
六、rclpy:Python 客户端库
6.1 Python → C 的绑定架构
rclpy 的调用链:Python 代码 ↓ 纯 Python 封装(rclpy/node.py 等) ↓ Python C 扩展(_rclpy.so) ↓ PyO3/CFFI 调用 rcl C API ↓ rcl(C 语言层) ↓ rmw(C 中间件接口) ↓ DDS 实现
6.2 rclpy.init() 与 Node 创建
# rclpy/rclpy/__init__.pydef init(*, args=None, context=None, domain_id=None): """ Python 入口,最终调用 rcl_init() """ context = context or get_default_context() # 调用 C 扩展模块中的 rclpy_init # _rclpy.rclpy_init() → rcl_init() context._handle = _rclpy.rclpy_init( args if args is not None else sys.argv, domain_id)# rclpy/rclpy/node.py(核心类)class Node: def __init__( self, node_name: str, *, context: Context = None, cli_args: List[str] = None, namespace: str = None, use_global_arguments: bool = True, enable_rosout: bool = True, start_parameter_services: bool = True, parameter_overrides: List[Parameter] = None, allow_undeclared_parameters: bool = False, automatically_declare_parameters_from_overrides: bool = False, qos_overrides_options: QoSOverridesOptions = None, ): # 处理 CLI 参数(支持 --ros-args --remap 等) node_cli_args = cli_args if cli_args is not None else [] # 调用 C 扩展创建 rcl_node_t self.__handle = _rclpy.rclpy_create_node( node_name, namespace if namespace is not None else '', context.handle, node_cli_args, use_global_arguments, enable_rosout) # 初始化参数服务 self.__parameters: Dict[str, Parameter] = {} self._parameter_callbacks: List[Callable] = [] if start_parameter_services: self.__parameter_service = ParameterService(self) self.__parameter_event_publisher = self.__create_parameter_event_publisher() def create_publisher( self, msg_type, topic: str, qos_profile: Union[QoSProfile, int], *, callback_group: CallbackGroup = None, event_callbacks: PublisherEventCallbacks = None, qos_overriding_options: QoSOverridingOptions = None, ) -> Publisher: # 解析 QoS(支持直接传 int 作为 queue_size) qos_profile = self._validate_qos_or_depth_is_int(qos_profile) # 创建 Publisher 对象 # 内部调用 _rclpy.rclpy_create_publisher() → rcl_publisher_init() publisher = Publisher(self, msg_type, topic, qos_profile, event_callbacks) # 注册到节点的发布者列表 self.__publishers.append(weakref.ref(publisher)) return publisher def create_subscription( self, msg_type, topic: str, callback: Callable, qos_profile: Union[QoSProfile, int], *, callback_group: CallbackGroup = None, event_callbacks: SubscriptionEventCallbacks = None, ) -> Subscription: qos_profile = self._validate_qos_or_depth_is_int(qos_profile) # Subscription 内部创建 rcl_subscription_t # 并将 callback 注册到 executor 的等待队列 sub = Subscription( self, msg_type, topic, callback, qos_profile, raw=False, event_callbacks=event_callbacks) self.__subscriptions.append(weakref.ref(sub)) # 通知执行器有新的等待实体(触发 guard_condition) for executor in self.__executors: executor.wake() return sub
6.3 rclpy 执行器
# rclpy/rclpy/executors.pyclass SingleThreadedExecutor(Executor): """ rclpy 默认执行器,等价于 rclcpp::SingleThreadedExecutor """ def spin(self): """阻塞运行,直到 rclpy.ok() 返回 False""" while rclpy.ok(context=self._context): self.spin_once() def spin_once(self, timeout_sec: float = None): """ 处理一个待执行的回调(含超时等待) """ timeout_sec = float(timeout_sec) if timeout_sec is not None else None # 调用底层 spin_once_until_future_complete # 或直接调用 _take_and_execute() self._take_and_execute(timeout_sec=timeout_sec) def _take_and_execute(self, timeout_sec=None): """ 从等待集取出一个就绪实体并执行其回调 内部调用 rcl_wait() """ # ① 构建 wait_set(收集所有订阅者/定时器/服务等) wait_set = self._context.get_wait_set() # ② 调用 _rclpy.rclpy_wait() → rcl_wait() _rclpy.rclpy_wait(wait_set, timeout_ns) # ③ 收集就绪的实体 ready_entities = _rclpy.rclpy_get_ready_entities(wait_set) # ④ 执行第一个就绪的回调 for entity in ready_entities: if isinstance(entity, Subscription): # 取出消息并调用用户回调 msg, info = entity.take() if msg is not None: entity.callback(msg) break elif isinstance(entity, Timer): entity.callback() break
七、源码阅读路线图
7.1 分层阅读策略

7.2 关键问题导航
通过问题导向阅读,效率最高:

7.3 调试辅助工具
# 查看 rcl 层日志(需设置环境变量)export RCUTILS_LOGGING_BUFFERED_STREAM=1export RCUTILS_CONSOLE_OUTPUT_FORMAT="[{severity}] [{name}]: {message}"ros2 run demo_nodes_cpp talker# 查看 rmw 层调试信息(Fast-DDS)export FASTRTPS_DEFAULT_PROFILES_FILE=rmw_fastrtps_config.xml# 在 XML 中设置 <Log><Verbosity>Info</Verbosity></Log># 使用 ros2_tracing 追踪 rcl/rclcpp 函数调用# 需要安装 ros-humble-tracetools-launchros2 launch tracetools_launch trace.launch.pyros2 run demo_nodes_cpp talker &ros2 run demo_nodes_cpp listener &# 查看 LTTng 追踪结果# 使用 GDB 调试 C++ 节点ros2 run --prefix 'gdb -ex run --args' my_pkg my_node# 打印 wait_set 状态(添加日志到源码)RCLCPP_DEBUG(get_logger(), "wait_set has %zu subscriptions", wait_set.size_of_subscriptions);
八、重要设计模式总结
8.1 PIMPL(Pointer to Implementation)
// ROS2 到处使用 PIMPL 模式,确保 ABI 稳定性:// node.hpp(公共头文件,用户可见)class Node {public: Node(const std::string & name, ...); Publisher<T>::SharedPtr create_publisher(...); // 只暴露公共 API,不暴露内部成员private: // 所有内部状态通过 impl 指针访问 std::shared_ptr<NodeBaseInterface> node_base_; std::shared_ptr<NodeTopicsInterface> node_topics_; // ...};// 优点:改变内部实现不影响 ABI,用户代码无需重新编译
8.2 类型擦除(Type Erasure)
// Executor 处理不同消息类型的回调,通过类型擦除统一管理:// AnyExecutable 包含各种可执行实体的 base 指针struct AnyExecutable { rclcpp::SubscriptionBase::SharedPtr subscription; rclcpp::TimerBase::SharedPtr timer; rclcpp::ServiceBase::SharedPtr service; rclcpp::ClientBase::SharedPtr client; rclcpp::Waitable::SharedPtr waitable; // 所有类型都继承自各自的 Base 类 // 执行时通过虚函数(handle_message/execute_callback)分发};
8.3 等待/通知机制
// GuardCondition 是 ROS2 中"异步通知"的基础原语// 类似 eventfd 或条件变量,用于唤醒阻塞在 rcl_wait() 中的线程// 使用场景:// 1. 新节点/订阅者加入时,通知 Executor 重建 wait_set// 2. shutdown() 调用时,立即唤醒所有等待的 Executor// 3. 话题图变化时,唤醒 graph_guard_conditionclass GuardCondition {public: void trigger() { // 调用 rmw_trigger_guard_condition() // Fast-DDS 实现:写入一个 DDS DataWriter, // wait_set 中等待的 DataReader 立即收到通知 }};
夜雨聆风
