目录
一、需要具备的知识:
单向链表结构
C++11的原子操作atomic
C++11的多线程
二、使用单向链表实现普通队列
template<typename T>
class MyQueue
{
private:
struct Node
{
T _value;
Node* _next;
Node(const T& val = T{}) : _value(val), _next(nullptr) {}
};
Node* _head;
Node* _tail;
public:
MyQueue()
{
Node* node = new Node(T{});
_head = node;
_tail = node;
}
~MyQueue()
{
while (Node* node = _head)
{
_head = _head->_next;
delete node;
}
}
// 在链表头处插入
void enqueue(const T& data)
{
Node* node = new Node(data);
_tail->_next = node;
_tail = node;
}
// 在链表尾部删除
bool dequeue(T& data)
{
Node* head = _head;
Node* next = _head->_next;
if (_head == _tail)
{
if (next == nullptr)
return false;
}
data = next->_value;
_head = next;
delete head;
}
};
三、原子操作atomic
关于atomic的更多详细信息,可以参考std::atomic - cppreference.com
void test_atomic()
{
long la = 10;
long lb = 10;
atomic<long> mylong;
mylong.store(la); /* 意思是 mylong = la */
/* load():获取mylong储存的值,相当于 la == mylong */
if (la == mylong.load())
{
mylong.store(lb); /* mylong = 10 */
}
// 以下两种写法作用相同
mylong.compare_exchange_strong(la, lb);
while (false == mylong.compare_exchange_weak(la, lb));
/* compare_exchange_strong的意思是
* 如果mylong的值和la相同,则把mylong的值赋值为lb,然后返回true
* 如果mylong的值不等于la,则la的值被替换为mylong的值,返回false
* 在执行这个函数的时候,是原子的,不会被其他进程打断和影响
*/
}
四、使用atomic无锁队列
#include <atomic>
#include <thread>
using namespace std;
template<class T>
class LockFreeQueue
{
private:
struct Node
{
T _value;
std::atomic<Node*> _next;
Node(const T& val = T{}) : _value(val), _next(nullptr) {}
};
std::atomic<Node*> _head;
std::atomic<Node*> _tail;
public:
LockFreeQueue()
{
Node* node = new Node(T());
_head.store(node);
_tail.store(node);
}
~LockFreeQueue()
{
while (Node* node = _head)
{
_head.store(node->_next.load());
delete node;
}
}
// 入队操作
void enqueue(const T& data)
{
Node* node = new Node(data); /* 新建节点 */
Node* tail = _tail.load(std::memory_order_relaxed); /* 保存尾节点 */
while (true)
{
Node* next = tail->_next; /* 保存尾结点的下一个节点 */
if (!next) /* 必须等下一个节点是空时才可以插入 */
{
if (tail->_next.compare_exchange_weak(next, node))
{
_tail.compare_exchange_strong(tail, node);
return;
}
}
else /* next不为空,说明别的进程插入了,那么更新临时的尾节点 */
{
_tail.compare_exchange_strong(tail, next);
}
}
}
bool dequeue(T& data)
{
Node* head = nullptr;
while (true)
{
head = _head.load();
Node* tail = _tail.load();
Node* next = head->_next.load();
if (head == _head.load())
{
if (head == tail) /* 队列可能为空 */
{
if (next == nullptr) /* 队列真的为空 */
return false;
// 队列中有入队操作
_tail.compare_exchange_weak(tail, next);
}
else /* 正常出队列 */
{
data = next->_value;
if (_head.compare_exchange_weak(head, next))
break;
}
}
}
delete head;
return true;
}
};
五、测试无锁队列的效果
// 定义一个全局无锁队列变量
LockFreeQueue<People> myque;
// 入队
void thread1()
{
myque.enqueue(People{ 1, "sss" });
myque.enqueue(People{ 2, "sss" });
myque.enqueue(People{ 3, "sss" });
myque.enqueue(People{ 4, "sss" });
myque.enqueue(People{ 5, "sss" });
myque.enqueue(People{ 6, "sss" });
}
// 入队
void thread2()
{
myque.enqueue(People{ 10, "sss" });
myque.enqueue(People{ 20, "sss" });
myque.enqueue(People{ 30, "sss" });
myque.enqueue(People{ 40, "sss" });
myque.enqueue(People{ 50, "sss" });
myque.enqueue(People{ 60, "sss" });
}
// 出队
void thread3()
{
People pp;
while (myque.dequeue(pp))
{
std::cout << pp.age << " " << pp.name << std::endl;
}
}
// 测试无锁队列
int main()
{
std::thread th1(thread1);
std::thread th2(thread2);
std::thread th3(thread3);
th3.join();
th2.join();
th1.join();
return 0;
}