# ROS2基础教程 - 4 话题通信
# 4.1 话题简介
在搭建 ROS2 环境的时候,演示了小海龟案例,其中启动了两个节点,一个是窗口显示小海龟的节点,一个是键盘控制的节点。
键盘为什么能控制小海龟窗口的运动呢,这里就涉及到节点之间的通信。在机器人的组件中,有摄像头、雷达等传感器,这些传感器都是一个个节点,他们会采集数据,并将数据传递给其他一个或多个节点进行显示或处理。
ROS 中节点间的通信有很多方式,例如话题、服务、参数服务器等,其中话题是用的最多的一种通信方式,像摄像头数、雷达等数据都是话题在节点之间进行传输的。
在 ROS2 中,话题(Topics)是一种用于不同节点之间进行异步通信的机制,话题通信的实现依赖于发布/订阅模式,通过底层的 DDS(数据分发服务)来确保消息的可靠传递。在话题通信中,发送数据的对象称为发布者(Publisher),接收数据的对象称为订阅者(Subscriber)。发布者将数据发送到指定的 Topic ,订阅 Topic 的订阅者都可以获取到数据,同时传输的数据也有固定的数据类型。
下面通过一个简单的例子来演示话题的使用。
- 创建一个发布者,定时发布 Hello World!到指定的 Topic;
- 创建一个订阅者,订阅该Topic,接收Hello World!
# 4.2 Python实现话题通信
首先在 工作空间/src
创建一个功能包,我这里叫 topic_test_python
:
ros2 pkg create --build-type ament_python topic_test_python
在 topic_test_python/topic_test_python
创建两个节点,一个是发布消息的节点,一个是订阅消息的节点。
# 1 发布消息节点
首先创建一个节点,用来发布消息。
创建节点文件 hello_publisher.py
,内容如下:
import rclpy # 导入 ROS2 Python 客户端库
from rclpy.node import Node # 从 rclpy 导入 Node 类
from std_msgs.msg import String # 导入标准消息类型 String
class PublisherNode(Node):
def __init__(self):
super().__init__('hello_publisher') # 初始化节点,节点名为 my_publisher
self.publish() # 发布消息
def publish(self):
self.publisher_ = self.create_publisher(String, 'my_topic', 10) # 创建一个发布者,发布 String 类型消息到 my_topic,队列长度为 10
timer_period = 1 # 设置定时器频率为 1 秒,可以为小数
self.timer = self.create_timer(timer_period, self.timer_callback) # 创建定时器,每 1 秒调用一次 timer_callback 方法
def timer_callback(self):
msg = String() # 创建一个 String 类型的消息对象
msg.data = 'Hello World!' # 设置消息内容
self.publisher_.publish(msg) # 发布消息到 my_topic
self.get_logger().info(f'Published: {msg.data}') # 打印发布的消息内容到日志
def main(args=None):
rclpy.init(args=args) # 初始化 ROS2
publisher = PublisherNode() # 创建节点
rclpy.spin(publisher) # 持续运行节点,处理回调
publisher.destroy_node() # 销毁节点
rclpy.shutdown() # 关闭 ROS2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
在上面的代码中,使用 create_publisher
创建了一个发布者,指定了发布消息的类型、topic、队列的长度。同时创建了一个定时器,每秒执行一次,封装了发布消息的对象,使用发布者发布消息。
# 2 订阅消息节点
上面发布消息,下面创建一个节点用来订阅消息。
创建节点文件 hello_subscriber.py
,内容如下:
import rclpy # 导入 ROS2 Python 客户端库
from rclpy.node import Node # 从 rclpy 导入 Node 类
from std_msgs.msg import String # 导入标准消息类型 String
class SubscriberNode(Node):
def __init__(self):
super().__init__('hello_subscriber') # 初始化节点,节点名为 my_subscriber
self.subscribe() # 调用订阅
def subscribe(self):
self.subscription = self.create_subscription(String, 'my_topic', self.listener_callback, 10) # 创建订阅者,订阅 my_topic,接收 String 类型消息,队列长度为 10
# 接收消息的回调
def listener_callback(self, msg):
self.get_logger().info(f'Received: {msg.data}') # 打印接收到的消息内容到日志
def main(args=None):
rclpy.init(args=args) # 初始化 ROS2
subscriber = SubscriberNode() # 创建节点
rclpy.spin(subscriber) # 持续运行节点,处理回调
subscriber.destroy_node() # 销毁节点
rclpy.shutdown() # 关闭 ROS2
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
在上面的代码中,使用 create_subscription
创建一个订阅者,指定了消息的类型、topic、回调方法、队列长度,接收到消息后,会调用回调方法。
# 3 配置节点
在功能包下的 setup.py 中配置两个节点,在 entry_points
中配置:
entry_points={
'console_scripts': [
'hello_publisher = topic_test_python.hello_publisher:main',
'hello_subscriber= topic_test_python.hello_subscriber:main',
],
},
2
3
4
5
6
# 4 构建项目
现在代码已经编写完成了,需要构建项目。
在工作空间下执行如下命令:
colcon build
构建完成,会在 install
目录下生成文件。
# 5 运行节点
首先执行 source
命令,在工作空间下执行:
source install/local_setup.sh
因为是刚构建,打开终端虽然已经source了,但是还没有包括新建的功能包,所以这里还需要 source 一下,后面就不需要了。
然后打开一个终端,启动发布者节点:
ros2 run topic_test_python hello_publisher
此时发布节点就已经在发布消息了:
我们可以打开一个终端,使用如下命令查看发布的消息:
# 查看有哪些topic,可以看到/my_topic
ros2 topic list
# 可以使用echo打印topic中发布的内容
ros2 topic echo /my_topic # 查看/my_topic的内容
2
3
4
5
上面命令会显示 topic 中的内容。
打开另一个终端,运行订阅者节点:
ros2 run topic_test_python hello_subscriber
可以看到,终端不停的打印接收到的消息:
# 4.3 C++实现话题通信
首先在 工作空间/src
创建一个功能包,我这里叫 topic_test_cpp
:
ros2 pkg create --build-type ament_cmake topic_test_cpp
在 topic_test_cpp/src
创建两个节点,一个是发布消息的节点,一个是订阅消息的节点。
# 1 发布消息节点
创建节点文件 hello_publisher.cpp
,内容如下:
#include <rclcpp/rclcpp.hpp> // 导入 ROS2 C++ 客户端库
#include <std_msgs/msg/string.hpp> // 导入标准消息类型 String
class PublisherNode : public rclcpp::Node { // 创建一个继承自 rclcpp::Node 的类
public:
PublisherNode() : Node("hello_publisher") { // 构造函数,初始化节点名称为 hello_publisher
publish(); // 调用 publish 方法
}
void publish() { // 定义发布方法
publisher_ = this->create_publisher<std_msgs::msg::String>("my_topic", 10); // 创建发布者,发布 String 类型消息到 my_topic,队列长度为 10
timer_ = this->create_wall_timer( // 创建定时器,每 1 秒调用一次 timer_callback 方法
std::chrono::seconds(1),
std::bind(&PublisherNode::timer_callback, this) // 绑定成员函数
);
}
private:
void timer_callback() { // 定时器回调函数
auto msg = std_msgs::msg::String(); // 创建一个 String 类型的消息对象
msg.data = "Hello World!"; // 设置消息内容
publisher_->publish(msg); // 发布消息到 my_topic
RCLCPP_INFO(this->get_logger(), "Published: '%s'", msg.data.c_str()); // 打印发布的消息内容到日志
}
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_; // 定义发布者的共享指针
rclcpp::TimerBase::SharedPtr timer_; // 定义定时器的共享指针
};
int main(int argc, char * argv[]) {
rclcpp::init(argc, argv); // 初始化 ROS2
auto publisher_node = std::make_shared<PublisherNode>(); // 创建 PublisherNode 实例
rclcpp::spin(publisher_node); // 持续运行节点,处理回调
rclcpp::shutdown(); // 关闭 ROS2
return 0; // 返回 0,表示程序正常结束
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
在上面的代码中,使用 create_publisher
创建了一个发布者,指定了发布消息的类型、topic、队列的长度。同时创建了一个定时器,每秒执行一次,封装了发布消息的对象,使用发布者发布消息。
# 2 订阅消息节点
创建节点文件 hello_subscriber.cpp
,内容如下:
#include <rclcpp/rclcpp.hpp> // 导入 ROS2 C++ 客户端库
#include <std_msgs/msg/string.hpp> // 导入标准消息类型 String
class SubscriberNode : public rclcpp::Node { // 创建一个继承自 rclcpp::Node 的类
public:
SubscriberNode() : Node("hello_subscriber") { // 构造函数,初始化节点名称为 hello_subscriber
subscribe(); // 调用 subscribe 方法
}
void subscribe() { // 定义订阅方法
subscription_ = this->create_subscription<std_msgs::msg::String>( // 创建订阅者,订阅 my_topic,接收 String 类型消息,队列长度为 10
"my_topic",
10,
std::bind(&SubscriberNode::listener_callback, this, std::placeholders::_1) // 指定回调函数
);
}
private:
void listener_callback(const std_msgs::msg::String::SharedPtr msg) { // 订阅回调函数
RCLCPP_INFO(this->get_logger(), "Received: '%s'", msg->data.c_str()); // 打印接收到的消息内容到日志
}
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_; // 定义订阅者的共享指针
};
int main(int argc, char * argv[]) {
rclcpp::init(argc, argv); // 初始化 ROS2
auto subscriber_node = std::make_shared<SubscriberNode>(); // 创建 SubscriberNode 实例
rclcpp::spin(subscriber_node); // 持续运行节点,处理回调
rclcpp::shutdown(); // 关闭 ROS2
return 0; // 返回 0,表示程序正常结束
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
在上面的代码中,使用 create_subscription
创建一个订阅者,指定了消息的类型、topic、回调方法、队列长度,接收到消息后,会调用回调方法。
# 3 配置CMakeLists.txt
在功能包下的 CMakeLists.txt 中,添加节点配置,可以添加在 find_package(ament_cmake REQUIRED)
下面。
因为用到了 String 类型,所以需要配置。
# 找到此包需要的其他包
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED) # 还需要std_msgs
# 添加可执行文件,指向 hello_publisher.cpp
add_executable(hello_publisher src/hello_publisher.cpp)
ament_target_dependencies(hello_publisher rclcpp std_msgs) # 需要rclcpp std_msgs
# 添加可执行文件,指向 hello_subscriber.cpp
add_executable(hello_subscriber src/hello_subscriber.cpp)
ament_target_dependencies(hello_subscriber rclcpp std_msgs) # 需要rclcpp std_msgs
# 安装可执行文件
install(TARGETS
hello_publisher
hello_subscriber
DESTINATION lib/${PROJECT_NAME}
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
上面配置了 hello_publisher
和 hello_subscriber
两个节点。
# 4 构建项目
现在代码已经编写完成了,需要构建项目。
在工作空间下执行如下命令:
colcon build
构建完成,会在 install
目录下生成文件。
# 5 运行节点
首先执行 source
命令,在工作空间下执行:
source install/local_setup.sh
因为是刚构建,打开终端虽然已经source了,但是还没有包括新建的功能包,所以这里还需要 source 一下,后面就不需要了。
然后打开一个终端,启动发布者节点:
ros2 run topic_test_cpp hello_publisher
打开另一个终端,运行订阅者节点:
ros2 run topic_test_cpp hello_subscriber
# 4.3 控制小海龟运动
小海龟窗口接收键盘控制节点发送的速度消息,然后进行显示处理。也就是说小海龟窗口订阅了某个话题,键盘控制节点发送速度消息到这个话题,从而实现控制。同样,我们也可以发布消息到这个话题,来实现小海龟的运动。
下面就使用 Python 来实现,发布消息给小海龟,控制小海龟的运动。
# 1 查看节点信息
首先把小海龟窗口节点运行起来:
ros2 run turtlesim turtlesim_node
然后可以使用命令查看有哪些运动的节点:
# 查看所有运行的节点
ros2 node list
2
可以使用下面的命令查看 turtlesim_node
节点的详细信息:
ros2 node info /turtlesim
显示如下,包括了节点订阅、发布和服务等信息:
可以看到节点订阅了一个 /turtle1/cmd_vel
的 topic,消息类型为 geometry_msgs/msg/Twist
,我们只需要按照格式发布消息即可。
# 2 使用节点控制小海龟
首先在 工作空间/src
创建一个功能包,我这里叫 control_turtle_python
:
ros2 pkg create --build-type ament_python control_turtle_python
在 control_turtle_python/control_turtle_python
创建一个节点,发布 geometry_msgs/msg/Twist
消息,控制乌龟的运动。
control_turtle.py
,内容如下:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
class ControllerTurtle(Node):
def __init__(self):
super().__init__('turtle_controller')
# 创建发布者,发布到 /turtle1/cmd_vel 话题
self.publisher_ = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)
# 定时器,用于不停的发布速度消息
self.timer = self.create_timer(0.1, self.publish_velocity_command)
# 发布速度消息
def publish_velocity_command(self):
# 创建并设置速度消息
twist = Twist()
twist.linear.x = 1.0 # 设置线速度,向前运动
twist.angular.z = 0.5 # 设置角速度,逆时针旋转
# 发布消息
self.publisher_.publish(twist)
self.get_logger().info('Publishing velocity command: linear.x=1.0, angular.z=0.5')
def main(args=None):
rclpy.init(args=args)
controller_turtle = ControllerTurtle()
try:
rclpy.spin(controller_turtle)
except KeyboardInterrupt:
pass
# 销毁节点并关闭 rclpy
controller_turtle.destroy_node()
rclpy.shutdown()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
在上面的节点中,通过定时器,不停的向 /turtle1/cmd_vel
话题,不停的发布速度信息。
上面发布的速度信息包括线速度和角速度,线速度包括 X/Y/Z
轴上的速度,角速度包括 X/Y/Z
轴上的旋转速度,坐标系参见 TF坐标变换
章节。
# 3 配置节点
在功能包下的 setup.py 中配置两个节点,在 entry_points
中配置:
entry_points={
'console_scripts': [
'control_turtle = control_turtle_python.control_turtle:main'
],
},
2
3
4
5
# 4 编译和运行
编译节点后,首先运行小海龟的窗口节点,然后在运行上面的节点。
# 构建项目
colcon build
# 运行小海龟节点
ros2 run turtlesim turtlesim_node
# 运行发布消息节点
ros2 run control_turtle_python control_turtle
2
3
4
5
6
7
8
发布消息节点不停的向 /turtle1/cmd_vel
话题发布速度信息:
小海龟窗口接收到速度消息,就会不停的运动:
# 4.4 topic常用的终端命令
上面用到了一些命令,如下:
查看有哪些 topic:
ros2 topic list
查看所有运行的节点:
ros2 node list
查看节点信息:
ros2 node info 节点名称
可以使用 ros2 topic echo
打印指定 topic 中发布的内容:
# 查看/turtle1/cmd_vel话题中发布的内容
ros2 topic echo /turtle1/cmd_vel
2
通过上面的的命令,可以查看 /turtle1/cmd_vel
话题中发布的消息(如果有人往这个话题发布信息的话,就可以看到)。
同样,我们也可以使用终端命令,向指定的话题发布消息:
例如我们可以使用终端命令向 /turtle1/cmd_vel
话题中发布速度消息,控制小海龟的运动。
ros2 topic pub -r 10 /turtle1/cmd_vel geometry_msgs/msg/Twist "{\"linear\": {\"x\": 1.0, \"y\": 0.0, \"z\": 0.0}, \"angular\": {\"x\": 0.0, \"y\": 0.0, \"z\": 0.5}}"
# 4.5 ROS开发思路
通过话题通信,这里简单介绍一下 ROS 中实现功能的逻辑。
例如想通过摄像头实现图像识别,那么我们首先问一下摄像头的厂商有没有提供 ROS 的依赖包,如果有,那么就可以集成到我们的工作空间中,然后编译运行,这样就可以通过硬件厂商提供的 ROS 包获取到硬件的数据,一般都是以话题的形式发布数据,这样我们就可以通过订阅硬件发布的话题来获取到数据。
如果硬件没有提供 ROS 的依赖包,那么就看一下如何获取到硬件数据,例如如何获取到笔记本的摄像头数据,知道如何获取硬件数据后,我们可以自己封装一个节点,来获取硬件的数据,然后将数据通过话题发布出来。
然后我们就可以再编写另外一个节点,用来实现我们具体想实现的功能,在实现功能的节点中,订阅话题,来获取硬件的数据,例如获取摄像头数据、雷达数据等,获取到数据后,如何实现想要的功能,就和在不在 ROS 中实现区别不大了,你想实现人脸识别或图像识别,获取到摄像头数据,直接去做就可以了。
← 03-HelloWorld 05-服务通信 →