我们先分析下问题。
如果插入时只需要考虑插入的并发处理,而不需要考虑弹出的影响;
同理,弹出时只需要考虑弹出的并发处理,而不需要考虑插入的影响,
那么问题的复杂度将直接下降一般。
为了分离插入和弹出,我们可否想象成这是在两个链表上操作。
操作两个链表户部影响的本质,其实是在操作两个头指针,对不对。所以没有任何影响。
所以我们需要两个指针,这个指针一个是头,另外一个是尾,这次这两个指针指向了同一个链表,只有当数据只剩下一个时,pop操作才需要更新tail指针,其他情况直接pop即可,而插入根本不需要考虑头指针。
node0 --> node1 --> nullptr
/|\
|
head_ (node0*)
#include <atomic>
#include <stdexcept>
template<typename T>
class LockFreeQueue {
public:
LockFreeQueue() : head_(new Node), tail_(head_.load()) {
}
~LockFreeQueue() {
while (Node* const old_head = head_) {
head_ = old_head->next_;
delete old_head;
}
}
void push(const T& value) {
// 预先准备好待插入的结点
Node* const new_node = new Node(value);
// 记录旧的tail结点
Node* old_tail = tail_.load();
while (true) {
Node* const next = old_tail->next_.load();
if (next == nullptr) {
// 说明此刻tail的值还没被更新,如果不是nullptr,说明这个结点已经有后继结点了,说明别人已经在插入了,甚至已经插入完成
// 如果old的next还是原来的next即nullptr,立刻更新指向新插入的结点,如果不是,说明这个已经不是old_tail了,我们取新的tail
if (old_tail->next_.compare_exchange_weak(next, new_node)) {
tail_.compare_exchange_weak(old_tail, new_node); // tail 还是原来的值,说明没有人插入或者弹出,原子的更新tail的值为新的值,也就是更新里面的结点值,
return;
}
}
else {
// 非空,说明别人也在push, 如果tail_还是旧old_tail_,说明别人也在push,但只进行到更新了old_tail_的next结点这一步,还没有置换新的tail_,我们应该顺应它的行为,帮助它把事情干了。然后我们继续,old_tail的值会被更新成tail_, 开启下一轮重试
tail_.compare_exchange_weak(old_tail, next);
}
}
}
bool pop(T& value) {
Node* old_head = head_.load();
while (true) {
Node* const next = old_head->next_.load();
if (!next) {
// 队列为空,直接返回
return false;
}
if (!next->next_) {
// 如果next是最后一个节点
tail_.compare_exchange_weak(next, old_head); // 更新tail_指针为head_指针
}
//head_还是旧head_,说明无人pop,那就把head_更新为head_的下一个结点
if (head_.compare_exchange_weak(old_head, next)) {
value = next->value_;
delete old_head;
return true;
}
}
}
private:
struct Node {
T value_;
std::atomic<Node*> next_;
Node() : next_(nullptr) {
}
Node(const T& value) : value_(value), next_(nullptr) {
}
};
std::atomic<Node*> head_;
std::atomic<Node*> tail_;
};