一种控制并发的实现方式
算法来源
这里的代码实现从ceph源码改编过来,这里进行了简化,主要是学习其中所涉及的c++知识点和算法思想
涉及到的知识点
三个关键的函数库:
#include <atomic>
#include <mutex>
#include <condition_variable>
atomic
这个类是对一个值的类型的封装,保证对该值的访问不会引起数据争用,并且可用于在不同线程之间同步内存访问,避免了对该数据的读写还需要手动加锁的麻烦。
mutex
这个类是一个同步原语,可用于保护共享数据免受多个线程同时访问。这里使用的其中std::mutex lock
,std::unique_lock
和std::guard_lock
。(unique_lock和guard_lock的简要区别是unique_lock需要手动解锁,guard_lock在结束所在作用域时自动解锁)
例:
std::mutex lock;
int cnt = 0;
//使用unique_lock对修改cnt加锁
std::unique_lock<std::mutex> l(lock);
cnt++;
l.unlock();
//使用guard_lock对修改cnt加锁
{
//这里使用大括号的原因是将guard_lock的作用域限制仅用于cnt的修改,而不影响别的语句。
std::guard_lock(std::mutex) l(lock);
cnt++;
}
condition_variable
类是同步原语,能用于阻塞一个线程,或同时阻塞多个线程,直至另一线程修改共享的条件变量(这里的所说的共享的条件变量为使用std::condition_variable
定义的变量,且在使用范围内都是该变量,以下所说的共享变量即该变量)并通知 condition_variable
。
有意修改共享变量的线程必须
获得
std::mutex
在保有锁时进行修改
在
std::condition_variable
上执行 notify_one 或 notify_all (不需要为通知保有锁)
即使共享变量是由atomic
所封装,也必须在互斥下修改它,以便正确地发布修改到等待的线程。
任何有意在 std::condition_variable
上等待的线程必须
- 获得
std::unique_lock<std::mutex>
,在相同的mutex
上用于保护共享变量 - 执行
wait
,wait_for
或wait_until
,等待操作自动释放mutex
,并悬挂线程的执行。 - condition_variable 被通知时,时限消失或虚假唤醒发生,线程被唤醒,且自动重获得互斥。之后线程应检查条件,若唤醒是虚假的,则继续等待。
std::condition_variable
只可与 std::unique_lock<std::mutex>
一同使用;此限制在一些平台上允许最大效率。
算法思想
这里我理解的是该算法的目的是为了控制最大并发数。
其主要思想是,有固定数量的时隙,每次请求之前必须获取一个时隙或多个时隙,否则等待,直到可用的时隙数量满足需求时才能进行下一步操作。
源码实现
也可以从此处获得
//Throttle.h
#ifndef THROTTLE_THROTTLE_H
#define THROTTLE_THROTTLE_H
#include <atomic>
#include <mutex>
#include <condition_variable>
class Throttle {
std::atomic<int64_t> count = {0}, max = {0};
std::mutex lock;
std::condition_variable conds;
public:
Throttle(int64_t m = 0):max(m){};
~Throttle(){};
private:
//返回Ture
bool _should_wait(int64_t c) const {
int m = max;
int cur = count;
return m && \
((c <= m && cur + c > m) //满足该条件需要等待
|| (c >= m && cur > m)); //满足该条件将永远等待
}
bool _wait(int64_t c, std::unique_lock<std::mutex>& l);
public:
int64_t get_count(){ return count;}
/**
* @param c 待取的数量
* @return 返回已经被取的数量
*/
int64_t take(int64_t c = 1);
/**
* @param c 待取的数量
* @return 如果这个请求由于throttle被堵塞,返回ture, 否则返回false
*/
bool get(int64_t c = 1);
/**
* @param c 待取的数量
* @return 返回ture表示已经成功取到,返回fasle表示需要等待
*/
bool get_or_fail(int64_t c = 1);
/**
* @param c 要返回的数量
* @return 返回剩余请求已取的数量
*/
int64_t put(int64_t c = 1);
};
#endif //THROTTLE_THROTTLE_H
//Throttle.cpp
#include "Throttle.h"
#include <assert.h>
#include <iostream>
int64_t Throttle::take(int64_t c) {
if(0 == max) return 0;
{
std::lock_guard<std::mutex> l(lock);
count += c;
}
return count;
}
bool Throttle::get(int64_t c) {
if(0 == max) {
return false;
}
assert(c > 0);
bool waited = false;
{
std::unique_lock<std::mutex> l(lock);
if(_should_wait(c))
{
waited = true;
conds.wait(l, [this, c](){return (!_should_wait(c));}); //这里的写法是为了避免虚假唤醒
}
count += c;
}
return waited;
}
bool Throttle::get_or_fail(int64_t c) {
if(0 == max) {
return true;
}
assert(c > 0);
std::lock_guard<std::mutex> l(lock);
if(_should_wait(c)) {
return false;
}else {
count += c;
}
return true;
}
int64_t Throttle::put(int64_t c) {
if(0 == max) {
return 0;
}
std::lock_guard<std::mutex> l(lock);
if(0 != c)
{
assert(count >= c);
count -= c;
conds.notify_one();
}
return count;
}
//main.cpp
#include <iostream>
#include <thread>
#include <chrono>
#include <cstdlib>
#include <ctime>
#include "Throttle.h"
#include <mutex>
std::mutex lock;
Throttle op_throttle(12);
void task(int64_t i){
{
std::lock_guard<std::mutex> l(lock);
std::cout << "started... The task id is " << i << std::endl;
}
if(!op_throttle.get_or_fail(3)) {
op_throttle.get(3);
}
{
std::lock_guard<std::mutex> l(lock);
std::cout << "processing... The task id is " << i << ",current hold:" << op_throttle.get_count() << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(1+rand()&7));
{
std::lock_guard<std::mutex> l(lock);
std::cout << "finished... The task id is " << i << ",current hold:" << op_throttle.put(3) << std::endl;
}
}
int main() {
srand((int)time(0));
std::thread t1(task,1);
std::thread t2(task,2);
std::thread t3(task,3);
std::thread t4(task,4);
std::thread t5(task,5);
std::thread t6(task,6);
std::thread t7(task,7);
std::thread t8(task,8);
std::thread t9(task,9);
std::thread t10(task,10);
std::thread t11(task,11);
t11.join();
t10.join();
t9.join();
t8.join();
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
return 0;
}
可能出现的结果
started... The task id is 1
processing... The task id is 1,current hold:3
started... The task id is 2
processing... The task id is 2,current hold:6
started... The task id is 3
processing... The task id is 3,current hold:9
started... The task id is 4
processing... The task id is 4,current hold:12
started... The task id is 5
started... The task id is 6
started... The task id is 8
started... The task id is 9
started... The task id is 7
started... The task id is 10
started... The task id is 11
finished... The task id is 2,current hold:9
processing... The task id is 5,current hold:12
finished... The task id is 3,current hold:9
finished... The task id is 4,current hold:6
processing... The task id is 6,current hold:9
processing... The task id is 8,current hold:12
finished... The task id is 1,current hold:9
processing... The task id is 9,current hold:12
finished... The task id is 9,current hold:9
processing... The task id is 7,current hold:12
finished... The task id is 6,current hold:9
finished... The task id is 5,current hold:9
processing... The task id is 10,current hold:12
processing... The task id is 11,current hold:12
finished... The task id is 8,current hold:9
finished... The task id is 10,current hold:6
finished... The task id is 11,current hold:3
finished... The task id is 7,current hold:0