# ROS2基础教程 - 4 话题通信

# 4.1 话题简介

在搭建 ROS2 环境的时候,演示了小海龟案例,其中启动了两个节点,一个是窗口显示小海龟的节点,一个是键盘控制的节点。

键盘为什么能控制小海龟窗口的运动呢,这里就涉及到节点之间的通信。在机器人的组件中,有摄像头、雷达等传感器,这些传感器都是一个个节点,他们会采集数据,并将数据传递给其他一个或多个节点进行显示或处理。

ROS 中节点间的通信有很多方式,例如话题、服务、参数服务器等,其中话题是用的最多的一种通信方式,像摄像头数、雷达等数据都是话题在节点之间进行传输的。


在 ROS2 中,话题(Topics)是一种用于不同节点之间进行异步通信的机制,话题通信的实现依赖于发布/订阅模式,通过底层的 DDS(数据分发服务)来确保消息的可靠传递。在话题通信中,发送数据的对象称为发布者(Publisher),接收数据的对象称为订阅者(Subscriber)。发布者将数据发送到指定的 Topic ,订阅 Topic 的订阅者都可以获取到数据,同时传输的数据也有固定的数据类型。


下面通过一个简单的例子来演示话题的使用。

  • 创建一个发布者,定时发布 Hello World!到指定的 Topic;
  • 创建一个订阅者,订阅该Topic,接收Hello World!

# 4.2 Python实现话题通信

首先在 工作空间/src 创建一个功能包,我这里叫 topic_test_python

ros2 pkg create --build-type ament_python topic_test_python
1

topic_test_python/topic_test_python 创建两个节点,一个是发布消息的节点,一个是订阅消息的节点。


# 1 发布消息节点

首先创建一个节点,用来发布消息。

创建节点文件 hello_publisher.py,内容如下:

import rclpy  # 导入 ROS2 Python 客户端库
from rclpy.node import Node  # 从 rclpy 导入 Node 类
from std_msgs.msg import String  # 导入标准消息类型 String

class PublisherNode(Node):
    def __init__(self):
        super().__init__('hello_publisher')  # 初始化节点,节点名为 my_publisher
        self.publish()     # 发布消息
        
    def publish(self):
        self.publisher_ = self.create_publisher(String, 'my_topic', 10)  # 创建一个发布者,发布 String 类型消息到 my_topic,队列长度为 10
        timer_period = 1  # 设置定时器频率为 1 秒,可以为小数
        self.timer = self.create_timer(timer_period, self.timer_callback)  # 创建定时器,每 1 秒调用一次 timer_callback 方法

    def timer_callback(self):
        msg = String()  # 创建一个 String 类型的消息对象
        msg.data = 'Hello World!'  # 设置消息内容
        self.publisher_.publish(msg)  # 发布消息到 my_topic
        self.get_logger().info(f'Published: {msg.data}')  # 打印发布的消息内容到日志

def main(args=None):
    rclpy.init(args=args)  # 初始化 ROS2
    publisher = PublisherNode()  # 创建节点

    rclpy.spin(publisher)  # 持续运行节点,处理回调

    publisher.destroy_node()  # 销毁节点
    rclpy.shutdown()  # 关闭 ROS2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

在上面的代码中,使用 create_publisher 创建了一个发布者,指定了发布消息的类型、topic、队列的长度。同时创建了一个定时器,每秒执行一次,封装了发布消息的对象,使用发布者发布消息。

# 2 订阅消息节点

上面发布消息,下面创建一个节点用来订阅消息。

创建节点文件 hello_subscriber.py,内容如下:

import rclpy  # 导入 ROS2 Python 客户端库
from rclpy.node import Node  # 从 rclpy 导入 Node 类
from std_msgs.msg import String  # 导入标准消息类型 String

class SubscriberNode(Node):
    def __init__(self):
        super().__init__('hello_subscriber')  # 初始化节点,节点名为 my_subscriber
        self.subscribe()  # 调用订阅

    def subscribe(self):
        self.subscription = self.create_subscription(String, 'my_topic', self.listener_callback, 10)  # 创建订阅者,订阅 my_topic,接收 String 类型消息,队列长度为 10

    # 接收消息的回调
    def listener_callback(self, msg):
        self.get_logger().info(f'Received: {msg.data}')  # 打印接收到的消息内容到日志

def main(args=None):
    rclpy.init(args=args)  # 初始化 ROS2
    subscriber = SubscriberNode()  # 创建节点

    rclpy.spin(subscriber)  # 持续运行节点,处理回调

    subscriber.destroy_node()  # 销毁节点
    rclpy.shutdown()  # 关闭 ROS2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

在上面的代码中,使用 create_subscription 创建一个订阅者,指定了消息的类型、topic、回调方法、队列长度,接收到消息后,会调用回调方法。

# 3 配置节点

在功能包下的 setup.py 中配置两个节点,在 entry_points 中配置:

entry_points={
    'console_scripts': [
        'hello_publisher = topic_test_python.hello_publisher:main',
        'hello_subscriber= topic_test_python.hello_subscriber:main',
    ],
},
1
2
3
4
5
6

# 4 构建项目

现在代码已经编写完成了,需要构建项目。

工作空间下执行如下命令:

colcon build
1

构建完成,会在 install 目录下生成文件。

# 5 运行节点

首先执行 source 命令,在工作空间下执行:

source install/local_setup.sh
1

因为是刚构建,打开终端虽然已经source了,但是还没有包括新建的功能包,所以这里还需要 source 一下,后面就不需要了。


然后打开一个终端,启动发布者节点:

ros2 run topic_test_python hello_publisher
1

此时发布节点就已经在发布消息了:


我们可以打开一个终端,使用如下命令查看发布的消息:

# 查看有哪些topic,可以看到/my_topic
ros2 topic list

# 可以使用echo打印topic中发布的内容
ros2 topic echo /my_topic   # 查看/my_topic的内容
1
2
3
4
5

上面命令会显示 topic 中的内容。


打开另一个终端,运行订阅者节点:

ros2 run topic_test_python hello_subscriber
1

可以看到,终端不停的打印接收到的消息:

# 4.3 C++实现话题通信

首先在 工作空间/src 创建一个功能包,我这里叫 topic_test_cpp

ros2 pkg create --build-type ament_cmake topic_test_cpp
1

topic_test_cpp/src 创建两个节点,一个是发布消息的节点,一个是订阅消息的节点。


# 1 发布消息节点

创建节点文件 hello_publisher.cpp,内容如下:

#include <rclcpp/rclcpp.hpp>  // 导入 ROS2 C++ 客户端库
#include <std_msgs/msg/string.hpp>  // 导入标准消息类型 String

class PublisherNode : public rclcpp::Node {  // 创建一个继承自 rclcpp::Node 的类
public:
    PublisherNode() : Node("hello_publisher") {  // 构造函数,初始化节点名称为 hello_publisher
        publish();  // 调用 publish 方法
    }

    void publish() {  // 定义发布方法
        publisher_ = this->create_publisher<std_msgs::msg::String>("my_topic", 10);  // 创建发布者,发布 String 类型消息到 my_topic,队列长度为 10
        timer_ = this->create_wall_timer(  // 创建定时器,每 1 秒调用一次 timer_callback 方法
            std::chrono::seconds(1),
            std::bind(&PublisherNode::timer_callback, this)  // 绑定成员函数
        );
    }

private:
    void timer_callback() {  // 定时器回调函数
        auto msg = std_msgs::msg::String();  // 创建一个 String 类型的消息对象
        msg.data = "Hello World!";  // 设置消息内容
        publisher_->publish(msg);  // 发布消息到 my_topic
        RCLCPP_INFO(this->get_logger(), "Published: '%s'", msg.data.c_str());  // 打印发布的消息内容到日志
    }

    rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;  // 定义发布者的共享指针
    rclcpp::TimerBase::SharedPtr timer_;  // 定义定时器的共享指针
};

int main(int argc, char * argv[]) {
    rclcpp::init(argc, argv);  // 初始化 ROS2
    auto publisher_node = std::make_shared<PublisherNode>();  // 创建 PublisherNode 实例

    rclcpp::spin(publisher_node);  // 持续运行节点,处理回调

    rclcpp::shutdown();  // 关闭 ROS2
    return 0;  // 返回 0,表示程序正常结束
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

在上面的代码中,使用 create_publisher 创建了一个发布者,指定了发布消息的类型、topic、队列的长度。同时创建了一个定时器,每秒执行一次,封装了发布消息的对象,使用发布者发布消息。

# 2 订阅消息节点

创建节点文件 hello_subscriber.cpp,内容如下:

#include <rclcpp/rclcpp.hpp>  // 导入 ROS2 C++ 客户端库
#include <std_msgs/msg/string.hpp>  // 导入标准消息类型 String

class SubscriberNode : public rclcpp::Node {  // 创建一个继承自 rclcpp::Node 的类
public:
    SubscriberNode() : Node("hello_subscriber") {  // 构造函数,初始化节点名称为 hello_subscriber
        subscribe();  // 调用 subscribe 方法
    }

    void subscribe() {  // 定义订阅方法
        subscription_ = this->create_subscription<std_msgs::msg::String>(  // 创建订阅者,订阅 my_topic,接收 String 类型消息,队列长度为 10
            "my_topic",
            10,
            std::bind(&SubscriberNode::listener_callback, this, std::placeholders::_1)  // 指定回调函数
        );
    }

private:
    void listener_callback(const std_msgs::msg::String::SharedPtr msg) {  // 订阅回调函数
        RCLCPP_INFO(this->get_logger(), "Received: '%s'", msg->data.c_str());  // 打印接收到的消息内容到日志
    }

    rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;  // 定义订阅者的共享指针
};

int main(int argc, char * argv[]) {
    rclcpp::init(argc, argv);  // 初始化 ROS2
    auto subscriber_node = std::make_shared<SubscriberNode>();  // 创建 SubscriberNode 实例

    rclcpp::spin(subscriber_node);  // 持续运行节点,处理回调

    rclcpp::shutdown();  // 关闭 ROS2
    return 0;  // 返回 0,表示程序正常结束
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

在上面的代码中,使用 create_subscription 创建一个订阅者,指定了消息的类型、topic、回调方法、队列长度,接收到消息后,会调用回调方法。

# 3 配置CMakeLists.txt

在功能包下的 CMakeLists.txt 中,添加节点配置,可以添加在 find_package(ament_cmake REQUIRED) 下面。

因为用到了 String 类型,所以需要配置。

# 找到此包需要的其他包
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)   # 还需要std_msgs

# 添加可执行文件,指向 hello_publisher.cpp
add_executable(hello_publisher src/hello_publisher.cpp)
ament_target_dependencies(hello_publisher rclcpp std_msgs)   # 需要rclcpp std_msgs

# 添加可执行文件,指向 hello_subscriber.cpp
add_executable(hello_subscriber src/hello_subscriber.cpp)
ament_target_dependencies(hello_subscriber rclcpp std_msgs)   # 需要rclcpp std_msgs

# 安装可执行文件
install(TARGETS
  hello_publisher
  hello_subscriber
  DESTINATION lib/${PROJECT_NAME}
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

上面配置了 hello_publisherhello_subscriber 两个节点。

# 4 构建项目

现在代码已经编写完成了,需要构建项目。

工作空间下执行如下命令:

colcon build
1

构建完成,会在 install 目录下生成文件。

# 5 运行节点

首先执行 source 命令,在工作空间下执行:

source install/local_setup.sh
1

因为是刚构建,打开终端虽然已经source了,但是还没有包括新建的功能包,所以这里还需要 source 一下,后面就不需要了。


然后打开一个终端,启动发布者节点:

ros2 run topic_test_cpp hello_publisher
1

打开另一个终端,运行订阅者节点:

ros2 run topic_test_cpp hello_subscriber
1

# 4.3 控制小海龟运动

小海龟窗口接收键盘控制节点发送的速度消息,然后进行显示处理。也就是说小海龟窗口订阅了某个话题,键盘控制节点发送速度消息到这个话题,从而实现控制。同样,我们也可以发布消息到这个话题,来实现小海龟的运动。

下面就使用 Python 来实现,发布消息给小海龟,控制小海龟的运动。


# 1 查看节点信息

首先把小海龟窗口节点运行起来:

ros2 run turtlesim turtlesim_node 
1

然后可以使用命令查看有哪些运动的节点:

# 查看所有运行的节点
ros2 node list
1
2

可以使用下面的命令查看 turtlesim_node 节点的详细信息:

ros2 node info /turtlesim
1

显示如下,包括了节点订阅、发布和服务等信息:

可以看到节点订阅了一个 /turtle1/cmd_vel 的 topic,消息类型为 geometry_msgs/msg/Twist ,我们只需要按照格式发布消息即可。

# 2 使用节点控制小海龟

首先在 工作空间/src 创建一个功能包,我这里叫 control_turtle_python

ros2 pkg create --build-type ament_python control_turtle_python
1

control_turtle_python/control_turtle_python 创建一个节点,发布 geometry_msgs/msg/Twist 消息,控制乌龟的运动。

control_turtle.py,内容如下:

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist

class ControllerTurtle(Node):
    def __init__(self):
        super().__init__('turtle_controller')
        # 创建发布者,发布到 /turtle1/cmd_vel 话题
        self.publisher_ = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)
        # 定时器,用于不停的发布速度消息
        self.timer = self.create_timer(0.1, self.publish_velocity_command)

    # 发布速度消息
    def publish_velocity_command(self):
        # 创建并设置速度消息
        twist = Twist()
        twist.linear.x = 1.0  # 设置线速度,向前运动
        twist.angular.z = 0.5  # 设置角速度,逆时针旋转

        # 发布消息
        self.publisher_.publish(twist)
        self.get_logger().info('Publishing velocity command: linear.x=1.0, angular.z=0.5')

def main(args=None):
    rclpy.init(args=args)
    controller_turtle = ControllerTurtle()

    try:
        rclpy.spin(controller_turtle)
    except KeyboardInterrupt:
        pass

    # 销毁节点并关闭 rclpy
    controller_turtle.destroy_node()
    rclpy.shutdown()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

在上面的节点中,通过定时器,不停的向 /turtle1/cmd_vel 话题,不停的发布速度信息。

上面发布的速度信息包括线速度和角速度,线速度包括 X/Y/Z 轴上的速度,角速度包括 X/Y/Z 轴上的旋转速度,坐标系参见 TF坐标变换 章节。

# 3 配置节点

在功能包下的 setup.py 中配置两个节点,在 entry_points 中配置:

entry_points={
    'console_scripts': [
        'control_turtle = control_turtle_python.control_turtle:main'
    ],
},
1
2
3
4
5

# 4 编译和运行

编译节点后,首先运行小海龟的窗口节点,然后在运行上面的节点。

# 构建项目
colcon build

# 运行小海龟节点
ros2 run turtlesim turtlesim_node

# 运行发布消息节点
ros2 run control_turtle_python control_turtle
1
2
3
4
5
6
7
8

发布消息节点不停的向 /turtle1/cmd_vel 话题发布速度信息:

小海龟窗口接收到速度消息,就会不停的运动:

# 4.4 topic常用的终端命令

上面用到了一些命令,如下:

查看有哪些 topic:

ros2 topic list
1

查看所有运行的节点:

ros2 node list
1

查看节点信息:

ros2 node info 节点名称
1

可以使用 ros2 topic echo 打印指定 topic 中发布的内容:

# 查看/turtle1/cmd_vel话题中发布的内容
ros2 topic echo /turtle1/cmd_vel
1
2

通过上面的的命令,可以查看 /turtle1/cmd_vel 话题中发布的消息(如果有人往这个话题发布信息的话,就可以看到)。


同样,我们也可以使用终端命令,向指定的话题发布消息:

例如我们可以使用终端命令向 /turtle1/cmd_vel 话题中发布速度消息,控制小海龟的运动。

ros2 topic pub -r 10 /turtle1/cmd_vel geometry_msgs/msg/Twist "{\"linear\": {\"x\": 1.0, \"y\": 0.0, \"z\": 0.0}, \"angular\": {\"x\": 0.0, \"y\": 0.0, \"z\": 0.5}}"
1

# 4.5 ROS开发思路

通过话题通信,这里简单介绍一下 ROS 中实现功能的逻辑。

例如想通过摄像头实现图像识别,那么我们首先问一下摄像头的厂商有没有提供 ROS 的依赖包,如果有,那么就可以集成到我们的工作空间中,然后编译运行,这样就可以通过硬件厂商提供的 ROS 包获取到硬件的数据,一般都是以话题的形式发布数据,这样我们就可以通过订阅硬件发布的话题来获取到数据。

如果硬件没有提供 ROS 的依赖包,那么就看一下如何获取到硬件数据,例如如何获取到笔记本的摄像头数据,知道如何获取硬件数据后,我们可以自己封装一个节点,来获取硬件的数据,然后将数据通过话题发布出来。

然后我们就可以再编写另外一个节点,用来实现我们具体想实现的功能,在实现功能的节点中,订阅话题,来获取硬件的数据,例如获取摄像头数据、雷达数据等,获取到数据后,如何实现想要的功能,就和在不在 ROS 中实现区别不大了,你想实现人脸识别或图像识别,获取到摄像头数据,直接去做就可以了。