学习https://github.com/huoyu820125/SecondPaxos 自己编写网络版本
中途耽搁 此处记录下代码 待完成
1 // proposer.cpp: 定义控制台应用程序的入口点。 2 // 3 4 #include "stdafx.h" 5 #include <iostream> 6 #include <string> 7 #include <boost/array.hpp> 8 #include <boost/asio.hpp> 9 10 using boost::asio::ip::tcp; 11 12 const std::string default_port = "9687"; 13 const int acceptorCount = 11; 14 const int proposerCount = 11; 15 #define FIRST_FLAG "first" 16 #define SECOND_FLAG "second" 17 18 #pragma pack (1) 19 //提议数据结构 20 typedef struct PROPOSAL 21 { 22 unsigned int serialNum;//流水号,1开始递增,保证全局唯一 23 unsigned int value;//提议内容 24 }PROPOSAL; 25 #pragma pack() 26 27 28 29 int main() 30 { 31 boost::asio::io_context io_context; 32 33 tcp::resolver resolver(io_context); 34 tcp::resolver::results_type endpoints = 35 resolver.resolve("127.0.0.1", default_port.c_str()); 36 try { 37 tcp::socket socket(io_context); 38 boost::asio::connect(socket, endpoints); 39 40 for (;;) 41 { 42 char buf[8] = SECOND_FLAG; 43 boost::system::error_code error; 44 45 size_t len = socket.write_some(boost::asio::buffer(buf, sizeof(buf)), error); 46 47 if (error == boost::asio::error::eof) 48 break; // Connection closed cleanly by peer. 49 else if (error) 50 throw boost::system::system_error(error); // Some other error. 51 52 } 53 } 54 catch (std::exception& e) { 55 std::cerr << e.what() << std::endl; 56 } 57 58 59 return 0; 60 }
1 // acceptor.cpp: 定义控制台应用程序的入口点。 2 // 3 4 #include "stdafx.h" 5 #include <ctime> 6 #include <iostream> 7 #include <string> 8 #include <memory> 9 #include <mutex> 10 #include <thread> 11 #include <boost/asio.hpp> 12 13 using boost::asio::ip::tcp; 14 15 const int default_port = 9687; 16 const int acceptorCount = 11; 17 #define FIRST_FLAG "first" 18 #define SECOND_FLAG "second" 19 20 #pragma pack (1) 21 //提议数据结构 22 typedef struct PROPOSAL 23 { 24 unsigned int serialNum;//流水号,1开始递增,保证全局唯一 25 unsigned int value;//提议内容 26 }PROPOSAL; 27 #pragma pack() 28 29 //投票接受者 30 class Acceptor { 31 public: 32 Acceptor() { 33 m_maxSerialNum = 0; 34 m_lastAcceptValue.serialNum = 0; 35 m_lastAcceptValue.value = 0; 36 } 37 virtual ~Acceptor() {} 38 39 //同意投票 40 bool Propose(unsigned int serialNum, PROPOSAL &lastAcceptValue) { 41 std::lock_guard<std::mutex> lck(m_mtx); 42 if (0 == serialNum) return false; 43 if (m_maxSerialNum > serialNum) return false; 44 m_maxSerialNum = serialNum; 45 lastAcceptValue = m_lastAcceptValue; 46 47 return true; 48 } 49 50 //接受提议 51 bool Accept(PROPOSAL &value) { 52 if (0 == value.serialNum) return false; 53 if (m_maxSerialNum > value.serialNum) return false; 54 m_lastAcceptValue = value; 55 return true; 56 } 57 58 private: 59 Acceptor (const Acceptor&) = delete; 60 Acceptor operator=(const Acceptor&) = delete; 61 std::mutex m_mtx; 62 PROPOSAL m_lastAcceptValue;//最后接受的提议 63 unsigned int m_maxSerialNum;//Propose提交的最大流水号 64 }; 65 //==================================================================== 66 Acceptor aceptArray[acceptorCount]; 67 boost::asio::io_context io_context; 68 69 70 //====================================================================== 71 void HandleThreadFunc(std::shared_ptr<tcp::socket> p,int id) { 72 std::cout << "enter " __FUNCTION__ << std::endl; 73 Acceptor &acc = aceptArray[id]; 74 char buf[8] = { 0 }; 75 PROPOSAL pro; 76 boost::system::error_code ignored_error; 77 try { 78 boost::asio::read(*p, boost::asio::buffer(buf, sizeof(buf))); 79 80 if (strcmp(buf, FIRST_FLAG) == 0) { 81 boost::asio::read(*p, boost::asio::buffer(&pro, sizeof(pro))); 82 83 } 84 else if (strcmp(buf, SECOND_FLAG) == 0) { 85 std::cout << buf << std::endl; 86 } 87 } 88 catch (std::exception& e) { 89 std::cerr << e.what() << std::endl; 90 } 91 } 92 93 94 95 void MainLoop(int id) { 96 try { 97 tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), default_port+id)); 98 for (;;) { 99 std::shared_ptr<tcp::socket> psocket = std::make_shared<tcp::socket>(io_context); 100 acceptor.accept(*psocket); 101 102 std::thread t = std::thread(HandleThreadFunc, psocket,id); 103 t.detach(); 104 } 105 } 106 catch (std::exception& e) { 107 std::cerr << e.what() << std::endl; 108 } 109 } 110 111 int main() 112 { 113 std::thread t[acceptorCount]; 114 for (int i = 0; i < acceptorCount; i++) { 115 t[i] = std::thread(MainLoop,i); 116 } 117 118 for (int i = 0; i < acceptorCount; i++) { 119 t[i].join(); 120 } 121 122 return 0; 123 }