实验程序:
talker.cc
#include <std_msgs/Int32.h>
#include <ros/ros.h>
int main(int argc, char **argv) {
ros::init(argc, argv, "talker");
ros::NodeHandle nh;
std_msgs::Int32 data;
data.data = 0;
ros::Publisher pub = nh.advertise<std_msgs::Int32>("alan_topic", 1);
ros::Rate loop_rate(1.0); // 1hz
while (ros::ok()) {
data.data++;
pub.publish(data);
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] publish " << data.data << std::endl;
loop_rate.sleep();
}
return 0;
}
listener.cc
#include <ros/ros.h>
#include <std_msgs/Int32.h>
void alan_callback(const std_msgs::Int32& data){
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
return;
}
int main(int argc, char ** argv) {
ros::init(argc, argv, "listener");
ros::NodeHandle nh;
ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
ros::spin();
return 0;
}
实时
listener.cc
#include <unistd.h>
#include <ros/ros.h>
#include <std_msgs/Int32.h>
void alan_callback(const std_msgs::Int32& data){
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
sleep(5); // 5秒
return;
}
int main(int argc, char ** argv) {
ros::init(argc, argv, "listener");
ros::NodeHandle nh;
ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
ros::spin();
return 0;
}
回调函数处理时间长导致数据丢失(队列长度为1,处理回调函数时间过长时时,队列里旧数据不断被新数据顶掉,处理完第2帧时队列里数据为7,丢失数据的帧数跟回调处理时间有关,是动态变化的)
listener.cc
#include <unistd.h>
#include <ros/ros.h>
#include <std_msgs/Int32.h>
void alan_callback(const std_msgs::Int32& data){
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
sleep(5); // 5秒
return;
}
int main(int argc, char ** argv) {
ros::init(argc, argv, "listener");
ros::NodeHandle nh;
ros::Subscriber sub = nh.subscribe("alan_topic", 5, alan_callback);
ros::spin();
return 0;
}
只扩大回调队列长度;处理有延时,且有数据丢失
处理第3帧是因为一开始队列为空,第2帧处理完时队列刚满(3,4,5,6,7),队首为3,还没被顶掉
扫描二维码关注公众号,回复:
12877588 查看本文章
处理第14,15,16,17帧是因为talker在publish第17帧后终止,此时队列为(13,14,15,16,17),listener会依次处理直到队列为空
listener.cc
#include <unistd.h>
#include <ros/ros.h>
#include <std_msgs/Int32.h>
void alan_callback(const std_msgs::Int32& data){
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
if (data.data % 5 == 0) {
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
sleep(5); // 5秒
}
return;
}
int main(int argc, char ** argv) {
ros::init(argc, argv, "listener");
ros::NodeHandle nh;
ros::Subscriber sub = nh.subscribe("alan_topic", 5, alan_callback);
ros::spin();
return 0;
}
扩大回调队列长度(队列长度至少设为回调函数最长处理时间 / 订阅的topic的频率),并且做跳帧处理;无数据丢失,且在跳帧处实时
另起一个后端处理线程
listener.cc
#include <unistd.h>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <ros/ros.h>
#include <std_msgs/Int32.h>
std::queue<int> q;
std::mutex m;
std::condition_variable cv;
bool data_ready = false;
void Process() {
while(ros::ok()) {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return data_ready;});
std::cout << "queue size: " << q.size() << std::endl;
if (q.empty()) {
continue;
}
int data = q.front();
data_ready = false;
q.pop();
lk.unlock();
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data << " long time" << std::endl;
sleep(5); // 5秒
}
}
void alan_callback(const std_msgs::Int32& data){
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
{
std::lock_guard<std::mutex> lk(m);
q.push(data.data);
data_ready = true;
}
cv.notify_one();
return;
}
int main(int argc, char ** argv) {
std::thread t(Process);
ros::init(argc, argv, "listener");
ros::NodeHandle nh;
ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
ros::spin();
return 0;
}
前端callback实时预处理,后端处理线程延时处理,问题是队列q会不断加长,后端延迟越来越大
listener.cc
#include <unistd.h>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <ros/ros.h>
#include <std_msgs/Int32.h>
std::queue<int> q;
std::mutex m;
std::condition_variable cv;
bool data_ready = false;
void Process() {
int cnt = 0;
std::queue<int> q2;
while(ros::ok()) {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return data_ready;});
std::cout << "queue size: " << q.size() << std::endl;
q2 = q;
std::queue<int> empty_q;
std::swap(empty_q, q); // 清空队列
data_ready = false;
lk.unlock();
while (!q2.empty()) {
int data = q2.front();
cnt++;
q2.pop();
if (cnt % 5 == 0) {
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data << " long time" << std::endl;
sleep(5); // 5秒
}
}
}
}
void alan_callback(const std_msgs::Int32& data){
std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
{
std::lock_guard<std::mutex> lk(m);
q.push(data.data);
data_ready = true;
}
cv.notify_one();
return;
}
int main(int argc, char ** argv) {
std::thread t(Process);
ros::init(argc, argv, "listener");
ros::NodeHandle nh;
ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
ros::spin();
return 0;
}
前端callback实时预处理,后端处理线程跳帧处理,队列q保持一定长度,后端在跳帧处实时处理
CMakeLists.txt
cmake_minimum_required(VERSION 2.8.3)
project(ros_test)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")
find_package(catkin REQUIRED COMPONENTS
roscpp
std_msgs
)
include_directories(
${catkin_INCLUDE_DIRS}
)
add_executable(talker src/talker.cc)
target_link_libraries(talker ${catkin_LIBRARIES})
add_executable(listener src/listener.cc)
target_link_libraries(listener ${catkin_LIBRARIES})
note:
回调函数的处理时间不应太长,可只做一些简单的数据处理,然后把数据存到待处理队列,另外起一个后端线程独立处理队列中的数据;callback函数只负责push数据到队列,后端线程pop出数据并进行完整处理(在多线程中访问同一队列记得加锁mutex, condition_variable)