0 paxos算法解决了什么问题
现在有n个人组成提一个会议,这个会议的目的是为了确定今年的税率,那么每个人都会提出自己认为的今年的合理的税率,为了大家能够达成一致,有了paxos算法。实际里,这个会议就是一个集群。
1 paxos算法详解
1.1 基本概念
-
- 角色:
提议者(proposer):提议的发起者。
批准者(acceptor):对提议进行批准。
学习者(learner):作为一致性协议的副本因子,对被超过半数的 acceptor
批准的方案进行学习。
- 角色:
-
- 提议(propose):
Proposer(提议者)可以提出提议,最终要达成一致的 value 就在提议里。Acceptor(接收者)根据提议的编号来选择是否接受(accept)提议。如果超过半数的cceptor 接收了一个提议,那么这个提议就被接受(accepted)了,提议里的 value 也就被选定了。
- 提议(propose):
-
- 仲裁集合(Quorums): 是 Acceptor(假设有 N 个)的一个子集,任何两个 Quorums 至少有一个相同的成员,也就是说一个 quorums 是一个包含了超过 N/2+1 个 Acceptor 的一个集合。
-
- 提议序号(Proposal number)和批准值(agreed value):对于任意一个给定的提议者,其给定的提议对应的提议号必定全局唯一。每一个提议(n,v 代表的话)都有一个提议序号(n)和其想要通过的提议值(v),为了便于理解,设想一个权力机关去决定今年的税率,那么每个人会给自己的提议有一个全局唯一的序号(n),然后还包含对于想要决定的目标变量(税率)的值(n)也就是税率的大小。
1.2 算法具体流程
Paxos(这里主要说的是 Basic paxos 算法)的具体流程分为两阶段:
Phase1
- (1) Phase 1a - proposer 准备(Prepare):
Proposer 创建一个条消息,我们将其称为“Prepare”,以数字 n 标识。请注意,n 不是要提议的值也不是可能会被同意的商定的值,而只是一个数字,该数字由提议者唯一标识此初始消息(发送给接收者)。数字 n 必须大于此提议者在先前的任何 Prepare 消息中使用的任何数字。然后,它将包含 n 的 Prepare消息发送到接受方的一个仲裁集合(超过一半以上的 Acceptor 的集合)。请注意,Prepare 消息仅包含数字 n(也就是说,它不必包含例如建议的值,通常用 v 表示)。提议者决定谁在仲裁集合。如果提议者不能与至少一个接受者的仲裁集合进行通信,则他不应启动 Paxos。 - (2) Phase 1b - acceptor 承诺(Promise):
任何接受者都在等待来自任何提议者的准备消息。如果接受方收到一条准备消息,则接受方必须查看刚刚收到的准备消息的标识符编号 n。有两种情况: -
- 如果 n 高于接受者从任何提议者接收到的每个先前的提议编号,那么接受者必须向提议者返回一条消息,我们称其为“承诺”,以忽略所有将来的提议数量少的提议比 n。如果接受者在过去某个时候接受了提议者的提议(也就是第二阶段中批准的提议),则在对提议者的答复中必须包含先前接受的的提议编号(例如 m)和相应的接受(批准)值(例如 w)。
-
- 否则(即,n 小于或等于接受者从任何提议者收到的任何先前提议编号),接受者可以忽略收到的提议。在这种情况下,Paxos 不必工作。但是,为了优化起见,发送拒绝(Nack)响应将告诉提议者它停止以 n 作为提议的序号去建立共识(这是优化手段,因为即使不主动告诉,其提议的序号也会增长)。
Phase2
- (1) Phase 2a - proposer 发送 Accept 消息:
- 1 如果提议者从接受者的一个仲裁集合中获得大部分承诺,则需要为其提议设置值 v。
- 2 如果任何接受者先前已接受过一个提议,那么他们会将这个提议发送给提议者,提议者现在必须将其提议值 v 设置为与接受者报告的(与这些接受者最高提议编号关联的)提议值(也就是提议者之前接受过的提议),我们称之为 z。
- 3 如果到目前为止,没有一个接受者接受提议,则提
议者可以选择其最初想要提议的值,例如 x。在这个阶段,提议者会发送一个 Accept 信息:(n,v)给一个接受者个仲裁集合。(n 就是之前提议者发给接受者的准备信息里的提议里的提议序号。v=z 或 者 v=x (当所有接受者都没有接受过提议时。)),这个 accept 请求可以理解为一个请求:“请接受这个提议!”。
- (2) Phase 2b - acceptor 发送 Accepted 消息:
如果接受者从提议者接收到 Accept 信息(n,v),分为两种情况:- 1 如果:它尚未承诺(在 Paxos 协议的阶段 1b 中)仅考虑提议序号大于 n 的提议时,则它应该将(刚接收到的 Accept 消息的)值 v 注册为(协议)的接受值,并向提议者和每个学习者(通常是提议者本身)发送一条接受消息。
- 2 否则:它可以忽略这个 Accept 消息。
1.3 paxos算法的活锁问题
如上图,(从上图也可以看出集群里的一个机器可以有多重身份)首先 s1-3 给出 3 个 preparemeesage,然后 s1-3 是一个 quorum 得到了大多数支持,开始发送 phase2-a 的 accept 信息(A3.1),但是由于在发送phase2-a 的 accept 消息之前,s3-5 给出 3 个 prepare message,s3 又更新了自己主张的提议,那么之前发送的(accept 信息)A3.1 就不会被回应,之后 s1-2 发现由于超过半数(s3-5)拒绝了自己,然后就出了新的一轮 prepare message 为 P4.1,
然后,就这样循环往复下去出现了活锁。
其改进方法:
- 解决方案1:出现冲突的时候,在重新开始执行prepare message流程之前,施加随机的延迟;让其他proposers有机会完成value的确认
- 解决方案2:multi-paxos会使用leader来避免活锁;
2 paxos算法实现
2.1 paxos.cpp
模拟paxos算法的主要流程
#include <stdlib.h>
#include <stdio.h>
#include "Paxos/Acceptor.h"
#include "Paxos/Proposer.h"
#include "lib/Thread.h"
#include "lib/Lock.h"
#include "lib/mapi.h"
#include "lib/atom.h"
#include "lib/Logger.h"
paxos::Proposer p[5];
paxos::Acceptor a[11];
mdk::Mutex l[11];
int finishedCount = 0;
int finalValue = -1;
bool isFinished = false;
mdk::uint64 g_start;
mdk::Logger g_log;
void* Proposer(void *id)
{
mdk::Logger log;
char logName[256];
sprintf( logName, "Proposer%d", (long)id );
log.SetLogName(logName);
log.SetMaxLogSize(10);
log.SetMaxExistDay(30);
log.SetPrintLog(false);
paxos::Proposer &proposer = p[(long)id];
paxos::PROPOSAL value = proposer.GetProposal();
paxos::PROPOSAL lastValue;
int acceptorId[11];
int count = 0;
mdk::uint64 start = mdk::MillTime();
while ( true )
{
value = proposer.GetProposal();//拿到提议
log.Info("Info", "Proposer%d号开始(Propose阶段):提议=[编号:%d,提议:%d]\n",
(long)id, value.serialNum, value.value);
count = 0;
int i = 0;
for (i = 0; i < 11; i++ )
{
/*
* 发送消息到第i个acceptor
* 经过一定时间达到acceptor,sleep(随机时间)模拟
* acceptor处理消息,mAcceptors[i].Propose()
* 回应proposer
* 经过一定时间proposer收到回应,sleep(随机时间)模拟
* proposer处理回应mProposer.proposed(ok, lastValue)
*/
mdk::m_sleep(rand()%500);//经过随机时间,消息到达了mAcceptors[i]
//处理消息
l[i].Lock();
bool ok = a[i].Propose(value.serialNum, lastValue);
l[i].Unlock();
mdk::m_sleep(rand()%500);//经过随机时间,消息到达Proposer
//处理Propose回应
if ( !proposer.Proposed(ok, lastValue) ) //重新开始Propose阶段
{
mdk::m_sleep(1000);//为了降低活锁,多等一会让别的proposer有机会完成自己的2阶段批准
break;
}
paxos::PROPOSAL curValue = proposer.GetProposal();//拿到提议
if ( curValue.value != value.value )//acceptor本次回应可能推荐了一个提议
{
log.Info("Info", "Proposer%d号修改了提议:提议=[编号:%d,提议:%d]\n",
(long)id, curValue.serialNum, curValue.value);
break;
}
acceptorId[count++] = i;//记录愿意投票的acceptor
if ( proposer.StartAccept() ) break;
}
//检查有没有达到Accept开始条件,如果没有表示要重新开始Propose阶段
if ( !proposer.StartAccept() ) continue;
//开始Accept阶段
//发送Accept消息到所有愿意投票的acceptor
value = proposer.GetProposal();
log.Info("Info", "Proposer%d号开始(Accept阶段):提议=[编号:%d,提议:%d]\n",
(long)id, value.serialNum, value.value);
for (i = 0; i < count; i++ )
{
//发送accept消息到acceptor
//减少accept阶段等待时间,加快收敛
mdk::m_sleep(rand()%200);//经过随机时间,accept消息到达acceptor
//处理accept消息
l[acceptorId[i]].Lock();
bool ok = a[acceptorId[i]].Accept(value);
l[acceptorId[i]].Unlock();
mdk::m_sleep(rand()%200);//经过随机时间,accept回应到达proposer
//处理accept回应
if ( !proposer.Accepted(ok) ) //重新开始Propose阶段
{
mdk::m_sleep(1000);//为了降低活锁,多等一会让别的proposer有机会完成自己的2阶段批准
break;
}
if ( proposer.IsAgree() )//成功批准了提议
{
start = mdk::MillTime() - start;
log.Info("Info", "Proposer%d号的提议被批准,用时%lluMS:最终提议 = [编号:%d,提议:%d]\n", (long)id, start, value.serialNum, value.value);
g_log.Info("Info", "Proposer%d号的提议被批准,用时%lluMS:最终提议 = [编号:%d,提议:%d]\n", (long)id, start, value.serialNum, value.value);
if(finalValue == -1) finalValue = value.value;
else if(finalValue != value.value) finalValue = 0;
if ( 4 == mdk::AtomAdd(&finishedCount, 1) )
{
isFinished = true;
g_start = mdk::MillTime() - g_start;
if(finalValue > 0){
g_log.Info("Info", "Paxos完成,用时%lluMS,最终通过提议值为:%d\n", g_start, finalValue);
}
else{
g_log.Info("Info", "Paxos完成,用时%lluMS,最终结果不一致!\n", g_start);
}
}
return NULL;
}
}
}
return NULL;
}
//Paxos过程模拟演示程序
int main(int argc, char* argv[])
{
int i = 0;
g_log.SetLogName("Paxos");
g_log.SetMaxLogSize(10);
g_log.SetMaxExistDay(30);
g_log.SetPrintLog(true);
g_log.Info("Info", "5个Proposer, 11个Acceptor准备进行Paxos\n"
"每个Proposer独立线程,Acceptor不需要线程\n"
"Proposer编号从0-10,编号为i的Proposer初始提议编号和提议值是(i+1, i+1)\n"
"Proposer每次重新提议会将提议编号增加5\n"
"Proposer被批准后结束线程,其它线程继续投票最终,全部批准相同的值,达成一致。\n");
g_start = mdk::MillTime();
g_log.Info("Info", "Paxos开始\n" );
paxos::PROPOSAL value;
for ( i = 0; i < 5; i++ )
{
p[i].SetPlayerCount(5, 11);
value.serialNum = value.value = i + 1;
p[i].StartPropose(value);
}
mdk::Thread t[5];
for ( i = 0; i < 5; i++ ) t[i].Run(Proposer, (void*)i);
//for ( i = 0; i < 5; i++ ) t[i].WaitStop();
while(true){
if(isFinished) break;
mdk::m_sleep(500);
}
return 0;
}
2.2 proposer.cpp
模拟提议者的主要流程
#include "Proposer.h"
namespace paxos
{
Proposer::Proposer()
{
SetPlayerCount(0, 0);
}
Proposer::Proposer(short proposerCount, short acceptorCount)
{
SetPlayerCount(proposerCount, acceptorCount);
}
Proposer::~Proposer()
{
}
void Proposer::SetPlayerCount(short proposerCount, short acceptorCount)
{
m_proposerCount = proposerCount;
m_acceptorCount = acceptorCount;
return;
}
void Proposer::StartPropose(PROPOSAL &value)
{
m_value = value;
m_proposeFinished = false;
m_isAgree = false;
m_maxAcceptedSerialNum = 0;
m_okCount = 0;
m_refuseCount = 0;
m_start = time(NULL);
return;
}
PROPOSAL& Proposer::GetProposal()
{
return m_value;
}
/**
//提议者
class Proposer
{
public:
Proposer();
Proposer(short proposerCount, short acceptorCount);
virtual ~Proposer();
//设置参与者数量
void SetPlayerCount(short proposerCount, short acceptorCount);
//开始Propose阶段
void StartPropose(PROPOSAL &value);
//取得提议
PROPOSAL& GetProposal();
//提议被投票,Proposed失败则重新开始Propose阶段
bool Proposed(bool ok, PROPOSAL &lastAcceptValue);
//开始Accept阶段,满足条件成功开始accept阶段返回ture,不满足开始条件返回false
bool StartAccept();
//提议被接受,Accepted失败则重新开始Propose阶段
bool Accepted(bool ok);
//提议被批准
bool IsAgree();
private:
short m_proposerCount;///proposer数量
short m_acceptorCount;//acceptor数量
PROPOSAL m_value;//预备提议
bool m_proposeFinished;//完成拉票,准备开始二阶段
bool m_isAgree;//m_value被批准
unsigned int m_maxAcceptedSerialNum;//已被接受的提议中流水号最大的
time_t m_start;//阶段开始时间,阶段一,阶段二共用
short m_okCount;//投票数量,阶段一,阶段二共用
short m_refuseCount;//拒绝数量,阶段一,阶段二共用
};
//提议数据结构
typedef struct PROPOSAL
{
unsigned int serialNum;//流水号,1开始递增,保证全局唯一
unsigned int value;//提议内容
}PROPOSAL;
**/
bool Proposer::Proposed(bool ok, PROPOSAL &lastAcceptValue)
{
if ( m_proposeFinished ) return true;//可能是一阶段迟到的回应,直接忽略消息
if ( !ok )
{
m_refuseCount++;
//已有半数拒绝,不需要等待其它acceptor投票了,重新开始Propose阶段
//使用StartPropose(m_value)重置状态
//请完善下面逻辑
/**********Begin**********/
if ( m_refuseCount > m_acceptorCount/2 ){
// 重新开始投票阶段,将提案号自增
m_value.serialNum += 5;
StartPropose(m_value);
return false;
}
/**********End**********/
//拒绝数不到一半
return true;
}
m_okCount++;
/*
没有必要检查分支:serialNum为null
因为serialNum>m_maxAcceptedSerialNum,与serialNum非0互为必要条件
*/
//如果已经有提议被接受,修改成已被接受的提议
//请完善下面逻辑
/**********Begin**********/
if(lastAcceptValue.serialNum > m_maxAcceptedSerialNum){
m_value.value = lastAcceptValue.value;
m_maxAcceptedSerialNum = lastAcceptValue.serialNum;
return true;
}
/**********End**********/
//如果自己的提议被接受
if ( m_okCount > m_acceptorCount / 2 )
{
m_okCount = 0;
m_proposeFinished = true;
}
return true;
}
bool Proposer::StartAccept()
{
return m_proposeFinished;
}
bool Proposer::Accepted(bool ok)
{
if ( !m_proposeFinished ) return true;//可能是上次第二阶段迟到的回应,直接忽略消息
if ( !ok )
{
m_refuseCount++;
//已有半数拒绝,不需要等待其它acceptor投票了,重新开始Propose阶段
//使用StartPropose(m_value)重置状态
//请完善下面逻辑
/**********Begin**********/
if ( m_refuseCount > m_acceptorCount/2 ){
// 重新开始投票阶段,将提案号自增
m_value.serialNum += 5;
StartPropose(m_value);
return false;
}
/**********End**********/
return true;
}
m_okCount++;
if ( m_okCount > m_acceptorCount / 2 ) m_isAgree = true;
return true;
}
bool Proposer::IsAgree()
{
return m_isAgree;
}
}
2.3 acceptor.cpp
模拟接受者的主要流程
#include "Acceptor.h"
namespace paxos
{
Acceptor::Acceptor(void)
{
m_maxSerialNum = 0;
m_lastAcceptValue.serialNum = 0;
m_lastAcceptValue.value = 0;
}
Acceptor::~Acceptor(void)
{
}
/***
//投票者
class Acceptor
{
public:
Acceptor(void);
virtual ~Acceptor(void);
//同意投票
bool Propose(unsigned int serialNum, PROPOSAL &lastAcceptValue);
//接受提议
bool Accept(PROPOSAL &value);
private:
PROPOSAL m_lastAcceptValue;//最后接受的提议
unsigned int m_maxSerialNum;//Propose提交的最大流水号
};
//提议数据结构
typedef struct PROPOSAL
{
unsigned int serialNum;//流水号,1开始递增,保证全局唯一
unsigned int value;//提议内容
}PROPOSAL;
***/
bool Acceptor::Propose(unsigned int serialNum, PROPOSAL &lastAcceptValue)
{
if ( 0 == serialNum ) return false;
//提议不通过
if ( m_maxSerialNum > serialNum ) return false;
//接受提议
//请完善下面逻辑
/**********Begin**********/
//m_lastAcceptValue = lastAcceptValue;
// If n is higher than every previous proposal number received, from any of the Proposers, by the Acceptor,
// then the Acceptor must return a message, which we call a "Promise", to the Proposer, to ignore all future
// proposals having a number less than n. If the Acceptor accepted a proposal at some point in the past, it
// must include the previous proposal number, say m, and the corresponding accepted value, say w, in its response to the Proposer.
m_maxSerialNum = serialNum;
lastAcceptValue = m_lastAcceptValue; // which contains m -> w
/**********End**********/
return true;
}
bool Acceptor::Accept(PROPOSAL &value)
{
if ( 0 == value.serialNum ) return false;
//Acceptor又重新答应了其他提议
//请完善下面逻辑
/**********Begin**********/
if (m_maxSerialNum > value.serialNum ){
return false;
}
// 然后接受新的提案
m_lastAcceptValue = value;
/**********End**********/
//批准提议通过
//请完善下面逻辑
/**********Begin**********/
m_maxSerialNum = value.serialNum;
/**********End**********/
return true;
}
}
3 参考
[1] https://en.wikipedia.org/wiki/Paxos_(computer_science)#Byzantine_Paxos
[2] https://www.cs.rutgers.edu/~pxk/417/notes/paxos.html
[3] https://github.com/HaHaJeff/Paxos/blob/master/basic_paxos.md