本来是做python 开发的老板最近让做区块链也没有头绪从网上找了点视频看着照的敲了了下感觉是自己写过最多的代码啦 怪我才疏学浅 哈哈哈--
import threading, socket, pickle, queue
class Message(object):
MSG_ACCEPTOR_AGREE = 0
MSG_ACCEPTOR_ACCEPT = 1
MSG_ACCEPTOR_REJECT = 2
MSG_ACCEPTOR_UNACCEPT = 3
MSG_ACCEPT = 4
MSG_PROPOSE = 5
MSG_EXT_PROPOSE = 6
MSG_HEARTBEAT = 7
def __init__(self, command=None):
self.command = command
def copyAsReply(self, message):
self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
self.value = message.value
class MessagePump(threading.Thread):
#“”“messagepump封装socket连接,并负责送邮件的主人”“”
class MPHelper(threading.Thread):
#“”这个帮助类的原因是要尽可能快地从套接字上拔出东西,以避免
#填充缓冲区。回想起来,使用TCP可能更容易些:“”
def __init__(self, owner):
self.owner = owner
threading.Thread.__init__(self)
def run(self):
while not self.owner.abort:
try:
(bytes, addr) = self.owner.socket.recvfrom(2048)
msg = pickle.loads(bytes)
msg.source = addr[1]
self.owner.queue.put(msg)
except:
pass
def __init__(self, owner, port, timeout=2):
self.owner = owner
threading.Thread.__init__(self)
self.abort = False
self.timeout = 2
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000)
self.socket.bind(("localhost", port))
self.socket.settimeout(timeout)
self.queue = queue.Queue()
self.helper = MessagePump.MPHelper(self)
def run(self):
self.helper.start()
while not self.abort:
message = self.waitForMessage()
# 这需要被阻塞,否则有一个世界
#多线程在等待我们 # 疼痛
self.owner.recvMessage(message)
def waitForMessage(self):
try:
msg = self.queue.get(True, 3)
return msg
except: # ugh, specialise the exception!
return None
def sendMessage(self, message):
bytes = pickle.dumps(message)
address = ("localhost", message.to)
self.socket.sendto(bytes, address)
return True
def doAbort(self):
self.abort = True
import random
class AdversarialMessagePump(MessagePump):
#“”对抗性消息泵随机延迟消息并以任意顺序传递它们“”
def __init__(self, owner, port, timeout=2):
MessagePump.__init__(self, owner, port, timeout)
self.messages = set()
def waitForMessage(self):
try:
msg = self.queue.get(True, 0.1)
self.messages.add(msg)
except: # ugh, specialise the exception!
pass
if len(self.messages) > 0 and random.random() < 0.95: # Arbitrary!
msg = random.choice(list(self.messages))
self.messages.remove(msg)
else:
msg = None
return msg
class InstanceRecord(object):
#这是一个簿记类,它记录了我们在某一特定记录中所看到或承诺的所有建议,承兑人和领导者“
def __init__(self):
self.protocols = {}
self.highestID = (-1, -1)
self.value = None
def addProtocol(self, protocol):
self.protocols[protocol.proposalID] = protocol
if protocol.proposalID[1] > self.highestID[1] or (
protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
self.highestID = protocol.proposalID
def getProtocol(self, protocolID):
return self.protocols[protocolID]
def cleanProtocols(self):
keys = self.protocols.keys()
for k in keys:
protocol = self.protocols[k]
if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
print( "Deleting protocol")
del self.protocols[k]
class PaxosLeader(object):
def __init__(self, port, leaders=None, acceptors=None):
self.port = port
if leaders == None:
self.leaders = []
else:
self.leaders = leaders
if acceptors == None:
self.acceptors = []
else:
self.acceptors = acceptors
self.group = self.leaders + self.acceptors
self.isPrimary = False
self.proposalCount = 0
self.msgPump = MessagePump(self, port)
self.instances = {}
self.hbListener = PaxosLeader.HeartbeatListener(self)
self.hbSender = PaxosLeader.HeartbeatSender(self)
self.highestInstance = -1
self.stopped = True
# 上一次我们试图弥补任何差距
self.lasttime = time.time()
# ------------------------------------------------------
# 这两类侦听来自其他领导人的心跳
# ,如果没有出现,告诉领导应该
# 是主要的
class HeartbeatListener(threading.Thread):
def __init__(self, leader):
self.leader = leader
self.queue = queue.Queue()
self.abort = False
threading.Thread.__init__(self)
def newHB(self, message):
self.queue.put(message)
def doAbort(self):
self.abort = True
def run(self):
elapsed = 0
while not self.abort:
s = time.time()
try:
hb = self.queue.get(True, 2)
# 简单的方法来解决冲突——如果你的端口号,比我的大,
# 你成为领导者
if hb.source > self.leader.port:
self.leader.setPrimary(False)
except: # Nothing was got
self.leader.setPrimary(True)
class HeartbeatSender(threading.Thread):
def __init__(self, leader):
self.leader = leader
self.abort = False
threading.Thread.__init__(self)
def doAbort(self):
self.abort = True
def run(self):
while not self.abort:
time.sleep(1)
if self.leader.isPrimary:
msg = Message(Message.MSG_HEARTBEAT)
msg.source = self.leader.port
for l in self.leader.leaders:
msg.to = l
self.leader.sendMessage(msg)
# ------------------------------------------------------
def sendMessage(self, message):
self.msgPump.sendMessage(message)
def start(self):
self.hbSender.start()
self.hbListener.start()
self.msgPump.start()
self.stopped = False
def stop(self):
self.hbSender.doAbort()
self.hbListener.doAbort()
self.msgPump.doAbort()
self.stopped = True
def setPrimary(self, primary):
if self.isPrimary != primary:
# Only print if something's changed
if primary:
print( "I (%s) am the leader" % self.port)
else:
print("I (%s) am NOT the leader" % self.port)
self.isPrimary = primary
# ------------------------------------------------------
def getGroup(self):
return self.group
def getLeaders(self):
return self.leaders
def getAcceptors(self):
return self.acceptors
def getQuorumSize(self):
return (len(self.getAcceptors()) / 2) + 1
def getInstanceValue(self, instanceID):
if instanceID in self.instances:
return self.instances[instanceID].value
return None
def getHistory(self):
return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]
def getNumAccepted(self):
return len([v for v in self.getHistory() if v != None])
# ------------------------------------------------------
def findAndFillGaps(self):
# 如果没有收到消息,我们就利用这个机会做一点清理工作。
for i in range(1, self.highestInstance):
if self.getInstanceValue(i) == None:
print("Filling in gap", i)
self.newProposal(0,
i) # 这要么最终提交一个已经接受的值,要么填充0或没有OP的空白。
self.lasttime = time.time()
def garbageCollect(self):
for i in self.instances:
self.instances[i].cleanProtocols()
def recvMessage(self, message):
"""消息泵会周期性地调用它,即使没有消息可用。"""
if self.stopped: return
if message == None:
# 只有每天15s否则你就切好协议了风华正茂的风险:(
if self.isPrimary and time.time() - self.lasttime > 15.0:
self.findAndFillGaps()
self.garbageCollect()
return
if message.command == Message.MSG_HEARTBEAT:
self.hbListener.newHB(message)
return True
if message.command == Message.MSG_EXT_PROPOSE:
print("External proposal received at", self.port, self.highestInstance)
if self.isPrimary:
self.newProposal(message.value)
# 别人忽略我们得到的建议时,我们不是主要的
# 做,我们应该如果我们是善良,是一个消息说“领导回复了”
# 并给予一个新的地址。然而,我们也可能失败了。
return True
if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
# 可能,虽然我们仍然认为我们的主,我们会得到一个
# 接受消息,我们只是在听。
# 我们听力都接受感兴趣,所以我们一起玩,假装我们有协议
# ,越来越接受和听的一个群体和往常一样
if message.command == Message.MSG_ACCEPTOR_ACCEPT:
if message.instanceID not in self.instances:
self.instances[message.instanceID] = InstanceRecord()
record = self.instances[message.instanceID]
if message.proposalID not in record.protocols:
protocol = PaxosLeaderProtocol(self)
# 我们只是按摩这个协议,在等待接受状态。
protocol.state = PaxosLeaderProtocol.STATE_AGREED
protocol.proposalID = message.proposalID
protocol.instanceID = message.instanceID
protocol.value = message.value
record.addProtocol(protocol)
else:
protocol = record.getProtocol(message.proposalID)
# 如果我们启动了这个协议实例,就应该到达这里
protocol.doTransition(message)
return True
def newProposal(self, value, instance=None):
protocol = PaxosLeaderProtocol(self)
if instance == None:
self.highestInstance += 1
instanceID = self.highestInstance
else:
instanceID = instance
self.proposalCount += 1
id = (self.port, self.proposalCount)
if instanceID in self.instances:
record = self.instances[instanceID]
else:
record = InstanceRecord()
self.instances[instanceID] = record
protocol.propose(value, id, instanceID)
record.addProtocol(protocol)
def notifyLeader(self, protocol, message):
#协议完成后调用这个
if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
print( "Protocol instance %s accepted with value %s" % (message.instanceID, message.value))
self.instances[message.instanceID].accepted = True
self.instances[message.instanceID].value = message.value
self.highestInstance = max(message.instanceID, self.highestInstance)
return
if protocol.state == PaxosLeaderProtocol.STATE_REJECTED:
# 看消息找到的值,然后重试
# 最终,假设受众会接受一些价值
# 这一实例,该协议将完成。
self.proposalCount = max(self.proposalCount, message.highestPID[1])
self.newProposal(message.value)
return True
if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
pass
class PaxosLeaderProtocol(object):
# State variables
STATE_UNDEFINED = -1
STATE_PROPOSED = 0
STATE_AGREED = 1
STATE_REJECTED = 2
STATE_ACCEPTED = 3
STATE_UNACCEPTED = 4
def __init__(self, leader):
self.leader = leader
self.state = PaxosLeaderProtocol.STATE_UNDEFINED
self.proposalID = (-1, -1)
self.agreecount, self.acceptcount = (0, 0)
self.rejectcount, self.unacceptcount = (0, 0)
self.instanceID = -1
self.highestseen = (0, 0)
def propose(self, value, pID, instanceID):
self.proposalID = pID
self.value = value
self.instanceID = instanceID
message = Message(Message.MSG_PROPOSE)
message.proposalID = pID
message.instanceID = instanceID
message.value = value
for server in self.leader.getAcceptors():
message.to = server
self.leader.sendMessage(message)
self.state = PaxosLeaderProtocol.STATE_PROPOSED
return self.proposalID
def doTransition(self, message):
"""We run the protocol like a simple state machine. It's not always
okay to error on unexpected inputs, however, due to message delays, so we silently
ignore inputs that we're not expecting."""
"""我们像简单的状态机那样运行协议。并非总是如此。
可以在意外的输入错误,但是,由于消息延迟,所以我们默默地
忽略我们不期望的输入。"""
if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
if message.command == Message.MSG_ACCEPTOR_AGREE:
self.agreecount += 1
if self.agreecount >= self.leader.getQuorumSize():
# print "Achieved agreement quorum, last value replied was:", message.value
if message.value != None: # If it's none, can do what we like. Otherwise we have to take the highest seen proposal
if message.sequence[0] > self.highestseen[0] or (
message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
1]):
self.value = message.value
self.highestseen = message.sequence
self.state = PaxosLeaderProtocol.STATE_AGREED
# Send 'accept' message to group
msg = Message(Message.MSG_ACCEPT)
msg.copyAsReply(message)
msg.value = self.value
msg.leaderID = msg.to
for s in self.leader.getAcceptors():
msg.to = s
self.leader.sendMessage(msg)
self.leader.notifyLeader(self, message)
return True
if message.command == Message.MSG_ACCEPTOR_REJECT:
self.rejectcount += 1
if self.rejectcount >= self.leader.getQuorumSize():
self.state = PaxosLeaderProtocol.STATE_REJECTED
self.leader.notifyLeader(self, message)
return True
if self.state == PaxosLeaderProtocol.STATE_AGREED:
if message.command == Message.MSG_ACCEPTOR_ACCEPT:
self.acceptcount += 1
if self.acceptcount >= self.leader.getQuorumSize():
self.state = PaxosLeaderProtocol.STATE_ACCEPTED
self.leader.notifyLeader(self, message)
if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
self.unacceptcount += 1
if self.unacceptcount >= self.leader.getQuorumSize():
self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
self.leader.notifyLeader(self, message)
pass
class PaxosAcceptor(object):
def __init__(self, port, leaders):
self.port = port
self.leaders = leaders
self.instances = {}
self.msgPump = MessagePump(self, self.port)
self.failed = False
def start(self):
self.msgPump.start()
def stop(self):
self.msgPump.doAbort()
def fail(self):
self.failed = True
def recover(self):
self.failed = False
def sendMessage(self, message):
self.msgPump.sendMessage(message)
def recvMessage(self, message):
if message == None: return
if self.failed:
return # Failure means ignored and lost messages
if message.command == Message.MSG_PROPOSE:
if message.instanceID not in self.instances:
record = InstanceRecord()
self.instances[message.instanceID] = record
protocol = PaxosAcceptorProtocol(self)
protocol.recvProposal(message)
self.instances[message.instanceID].addProtocol(protocol)
else:
self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)
def notifyClient(self, protocol, message):
if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED:
self.instances[protocol.instanceID].value = message.value
# print "Proposal accepted at client: ", message.value
def getHighestAgreedProposal(self, instance):
return self.instances[instance].highestID
def getInstanceValue(self, instance):
return self.instances[instance].value
class PaxosAcceptorProtocol(object):
# State variables
STATE_UNDEFINED = -1
STATE_PROPOSAL_RECEIVED = 0
STATE_PROPOSAL_REJECTED = 1
STATE_PROPOSAL_AGREED = 2
STATE_PROPOSAL_ACCEPTED = 3
STATE_PROPOSAL_UNACCEPTED = 4
def __init__(self, client):
self.client = client
self.state = PaxosAcceptorProtocol.STATE_UNDEFINED
def recvProposal(self, message):
if message.command == Message.MSG_PROPOSE:
self.proposalID = message.proposalID
self.instanceID = message.instanceID
# What's the highest already agreed proposal for this instance?
(port, count) = self.client.getHighestAgreedProposal(message.instanceID)
# Check if this proposal is numbered higher
if count < self.proposalID[0] or (count == self.proposalID[0] and port < self.proposalID[1]):
# Send agreed message back, with highest accepted value (if it exists)
self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED
# print "Agreeing to proposal: ", message.instanceID, message.value
value = self.client.getInstanceValue(message.instanceID)
msg = Message(Message.MSG_ACCEPTOR_AGREE)
msg.copyAsReply(message)
msg.value = value
msg.sequence = (port, count)
self.client.sendMessage(msg)
else:
# Too late, we already told someone else we'd do it
# Send reject message, along with highest proposal id and its value
self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
return self.proposalID
else:
# error, trying to receive a non-proposal?
pass
def doTransition(self, message):
if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED
# Could check on the value here, if we don't trust leaders to honour what we tell them
# send reply to leader acknowledging
msg = Message(Message.MSG_ACCEPTOR_ACCEPT)
msg.copyAsReply(message)
for l in self.client.leaders:
msg.to = l
self.client.sendMessage(msg)
self.notifyClient(message)
return True
raise Exception("Unexpected state / command combination!")
def notifyClient(self, message):
self.client.notifyClient(self, message)
import time
if __name__ == '__main__':
numclients = 5
clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
leader = PaxosLeader(54321, [54322], [c.port for c in clients])
leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])
leader.start()
leader.setPrimary(True)
leader2.setPrimary(True)
leader2.start()
for c in clients:
c.start()
clients[0].fail()
clients[1].fail()
# clients[2].fail( )
# Send some proposals through to test
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
start = time.time()
for i in range(1000):
m = Message(Message.MSG_EXT_PROPOSE)
m.value = 0 + i
m.to = 54322
bytes = pickle.dumps(m)
s.sendto(bytes, ("localhost", m.to))
while leader2.getNumAccepted() < 999:
print( "Sleeping for 1s -- accepted:", leader2.getNumAccepted())
time.sleep(1)
end = time.time()
print("Sleeping for 10s")
time.sleep(10)
print("Stopping leaders")
leader.stop()
leader2.stop()
print("Stopping clients")
for c in clients:
c.stop()
print("Leader 1 history: ", leader.getHistory())
print( "Leader 2 history: ", leader2.getHistory())
print(end - start)