写在前面
最近项目中有用到boost库,这里就记录下学习过程。
一、什么是boost库
Boost是一个功能强大 , 构造精良 , 跨越平台 , 代码开源 , 完全免费的 C ++ 程序库。共包含 160 余个库 / 组件 , 涵盖字符串与文本处理、容器、迭代器、算法、图像处理、模板元编程、并发编程等多个领域。 由 c ++ 标准委员会成员发起倡议并建立 boost 社区 , C ++11 标准库中三分之二来自 boost , 并且将来还会有更多的库进入 c ++ 标准库 , 因此 boost 是一个 c ++ " 准 " 标准库。支持现有的所有操作系统。
二、使用boost实现socket客户端与服务端
2.1 客户端代码
BoostClient.h
#include "BoostClient.h"
BoostClient::BoostClient(io_service& service, ip::tcp::endpoint ep):m_ios(service)
{
pSocket.reset(new ip::tcp::socket(service));
boost::system::error_code ec;
pSocket->connect(ep, ec);
if(ec)
{
std::cerr << "Error connecting to server: " << ec.message() << std::endl;
bConnected = false;
return;
}
bConnected = true;
}
void BoostClient::AsyncRecvMessage()
{
memset(data_.data(), 0, sizeof(data_));
pSocket->async_read_some(buffer(data_), [this](const boost::system::error_code &ec, size_t bytes) {
if(!ec)
{
std::cout<<"recv size:"<<bytes<<std::endl;
std::cout<<"recv data:"<<data_.data()<<std::endl;
AsyncRecvMessage();
}
else if(ec == error::eof)
{
// 断开连接
bConnected = false;
std::cout<<"disconnected !"<<std::endl;
}
else
{
std::cout<<"read error: "<<ec.message()<<std::endl;
}
});
}
void BoostClient::AsyncSendMessage(std::string message)
{
if(message.empty() || !bConnected)
{
return;
}
async_write(*pSocket, buffer(message.c_str(), message.size()), [this](const boost::system::error_code &ec, size_t writed_bytes)
{
if (!ec)
{
}
else
{
std::cout <<"send error:"<<ec.message() << std::endl;
bConnected = false;
}
});
}
BoostClient.h
#include <iostream>
#include <array>
#include <boost/asio.hpp>
using namespace boost::asio;
class BoostClient
{
public:
BoostClient(io_service& service, ip::tcp::endpoint ep);
void AsyncRecvMessage();
void AsyncSendMessage(std::string message);
private:
bool bConnected;
// io_service &ios;
io_service &m_ios;
std::shared_ptr<ip::tcp::socket> pSocket;
std::array<char, 1024> data_;
};
main.cpp
#include <iostream>
#include "BoostClient.h"
#define DEFAULT_SERVER_IP "127.0.0.1"
#define DEFAULT_SERVER_PORT 8888
int main(int argc, char **argv)
{
ip::tcp::endpoint ep(ip::address_v4::from_string(DEFAULT_SERVER_IP), DEFAULT_SERVER_PORT);
io_service service;
BoostClient client(service, ep);
client.AsyncSendMessage("hello, server this is message.");
client.AsyncRecvMessage();
service.run();
return 0;
}
CMakeLists.txt\
cmake_minimum_required(VERSION 3.0.0)
project(boost_client)
add_executable(boost_client
main.cpp BoostClient.cpp
)
#boost库检查
find_package(Boost 1.77.0 COMPONENTS context thread date_time program_options filesystem system coroutine log_setup log REQUIRED)
if(Boost_FOUND)
include_directories(${
Boost_INCLUDE_DIRS})
link_directories(${
Boost_LIBRARY_DIRS})
else()
message("check boost lib failed!")
endif()
target_link_libraries(${
PROJECT_NAME}
pthread libboost_thread.a libboost_filesystem.a libboost_log_setup.a libboost_log.a libboost_locale.a
libboost_coroutine.a libboost_context.a
)
2.2 服务端代码
BoostServer.h
#include <memory>
#include <iostream>
#include <array>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <atomic>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using namespace boost::asio;
typedef std::function<void(void)> Callback_Close;
typedef std::function<void(std::string)> Callback_SendMessage;
// 接收客户端发送来的消息
class Session_Recv : public std::enable_shared_from_this<Session_Recv>
{
public:
Session_Recv(ip::tcp::socket socket);
void SetClose(Callback_Close pClose);
void SetMsg_Ntf(Callback_SendMessage pMsg);
void async_read();
bool GetState();
private:
ip::tcp::socket socket_;
std::array<char, 1024> data_;
std::atomic<bool> connected_;
Callback_Close ptr_close_;
Callback_SendMessage ptr_message_;
};
// 向客户端发送消息
class Session_Send : public std::enable_shared_from_this<Session_Send>
{
public:
Session_Send(ip::tcp::socket socket);
void SetClose(Callback_Close pClose);
void async_send(std::string message);
bool GetState();
private:
ip::tcp::socket socket_;
std::atomic<bool> connected_;
Callback_Close ptr_close_;
};
// 服务器 接收port_recv端口发来的数据 向port_send端口发送数据
class BoostServer
{
public:
BoostServer(boost::asio::io_context &ctx, short port_recv, short port_send, std::string ip_recv = "", std::string ip_send = "");
void Close_Session_Recv();
void Close_Session_Send();
void GetContent(std::string content);
private:
void Async_Accept_Recv();
void Async_Accept_Send();
void SendMessage();
private:
std::shared_ptr<std::thread> ptr_Thread_Msg;
std::shared_ptr<boost::asio::ip::tcp::acceptor> Acceptor_Recv_;
std::shared_ptr<boost::asio::ip::tcp::acceptor> Acceptor_Send_;
std::shared_ptr<ip::tcp::endpoint> ptr_Recv_ep;
std::shared_ptr<ip::tcp::endpoint> ptr_Send_ep;
std::vector<std::shared_ptr<Session_Recv>> clients_recv;
std::vector<std::shared_ptr<Session_Send>> clients_send;
std::queue<std::string> Array_messages;
std::atomic<bool> bStopSend;
std::mutex mtx;
};
BoostServer.cpp
#include "boostServer.h"
Session_Recv::Session_Recv(ip::tcp::socket socket):socket_(std::move(socket))
{
connected_ = true;
ptr_close_ = nullptr;
ptr_message_ = nullptr;
}
void Session_Recv::SetClose(Callback_Close pClose)
{
ptr_close_ = pClose;
}
void Session_Recv::SetMsg_Ntf(Callback_SendMessage pMsg)
{
ptr_message_ = pMsg;
}
void Session_Recv::async_read()
{
auto self(shared_from_this());
memset(data_.data(), 0, sizeof(data_));
// 读操作完成时回调该函数, 读取到一些数据就会触发回调
socket_.async_read_some(buffer(data_), [this, self](const boost::system::error_code &ec, size_t bytes) {
if (!ec)
{
if(ptr_message_)
{
ptr_message_(std::string(data_.data(), bytes));
}
std::cout << "size:"<< bytes<<std::endl;
async_read();
}
else if(ec == error::eof)
{
// 断开连接
std::cout <<"read disconnect:"<<ec.message() << std::endl;
connected_ = false;
if(ptr_close_)
{
ptr_close_();
}
}
else
{
std::cout<<"read error:"<<ec.message()<<std::endl;
}
});
}
bool Session_Recv::GetState()
{
return connected_;
}
Session_Send::Session_Send(ip::tcp::socket socket):socket_(std::move(socket))
{
connected_ = true;
ptr_close_ = nullptr;
}
void Session_Send::SetClose(Callback_Close pClose)
{
ptr_close_ = pClose;
}
void Session_Send::async_send(std::string message)
{
if(message.empty() || !connected_)
{
return;
}
auto self(shared_from_this());
async_write(socket_, buffer(message.c_str(), message.size()), [this, self](const boost::system::error_code &ec, size_t writed_bytes)
{
if (!ec)
{
// async_read();
}
else
{
std::cout <<"send error:"<<ec.message() << std::endl;
connected_ = false;
if(ptr_close_)
{
ptr_close_();
}
}
});
}
bool Session_Send::GetState()
{
return connected_;
}
BoostServer::BoostServer(boost::asio::io_context &ctx, short port_recv, short port_send, std::string ip_recv, std::string ip_send)
{
ptr_Thread_Msg.reset(new std::thread(std::bind(&BoostServer::SendMessage, this)));
if("" == ip_recv)
{
ptr_Recv_ep.reset(new ip::tcp::endpoint(ip::tcp::v4(), port_recv));
}
else
{
ptr_Recv_ep.reset(new ip::tcp::endpoint(ip::address::from_string(ip_recv), port_recv));
}
Acceptor_Recv_.reset(new ip::tcp::acceptor(ctx, *ptr_Recv_ep));
if("" == ip_send)
{
ptr_Send_ep.reset(new ip::tcp::endpoint(ip::tcp::v4(), port_send));
}
else
{
ptr_Send_ep.reset(new ip::tcp::endpoint(ip::address::from_string(ip_send), port_send));
}
Acceptor_Send_.reset(new ip::tcp::acceptor(ctx, *ptr_Send_ep));
clients_recv.clear();
clients_send.clear();
Async_Accept_Recv();
Async_Accept_Send();
}
void BoostServer::Async_Accept_Recv()
{
Acceptor_Recv_->async_accept([this](boost::system::error_code ec, ip::tcp::socket socket)
{
if (!ec)
{
auto ptr = std::make_shared<Session_Recv>(std::move(socket));
ptr->SetClose(std::bind(&BoostServer::Close_Session_Recv, this));
ptr->SetMsg_Ntf(std::bind(&BoostServer::GetContent, this, std::placeholders::_1));
ptr->async_read();
clients_recv.push_back(ptr);
}
Async_Accept_Recv();
});
}
void BoostServer::Async_Accept_Send()
{
Acceptor_Send_->async_accept([this](boost::system::error_code ec, ip::tcp::socket socket)
{
if (!ec)
{
auto ptr = std::make_shared<Session_Send>(std::move(socket));
ptr->SetClose(std::bind(&BoostServer::Close_Session_Send, this));
clients_send.push_back(ptr);
}
Async_Accept_Send();
});
}
void BoostServer::Close_Session_Recv()
{
for(auto ptr = clients_recv.begin(); ptr != clients_recv.end();)
{
if (false == ptr->get()->GetState())
{
ptr->reset();
clients_recv.erase(ptr);
continue;
}
ptr++;
}
std::cout <<"Send_Data_Clients: "<<clients_recv.size()<<", Recv_Data_Clients: "<<clients_send.size()<< std::endl;
}
void BoostServer::Close_Session_Send()
{
for(auto ptr = clients_send.begin(); ptr != clients_send.end();)
{
if(false == ptr->get()->GetState())
{
ptr->reset();
clients_send.erase(ptr);
continue;
}
ptr++;
}
std::cout <<"Send_Data_Clients: "<<clients_recv.size()<<", Recv_Data_Clients: "<<clients_send.size()<< std::endl;
}
void BoostServer::GetContent(std::string content)
{
if(content.empty())
{
return;
}
if(mtx.try_lock())
{
Array_messages.push(content);
mtx.unlock();
}
}
void BoostServer::SendMessage()
{
std::string message = "";
bStopSend = false;
while(!bStopSend)
{
if(mtx.try_lock())
{
if(!Array_messages.empty())
{
message = Array_messages.front();
Array_messages.pop();
}
mtx.unlock();
}
if(!message.empty())
{
for(auto ptr = clients_send.begin(); ptr != clients_send.end(); ptr ++)
{
std::cout << "send message" << std::endl;
ptr->get()->async_send(message);
}
message.clear();
continue;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
main.cpp
#include "boostServer.h"
#define DEFAULT_SERVER_IP "127.0.0.1"
#define DEFAULT_SERVER_PORT1 8888
#define DEFAULT_SERVER_PORT2 9999
int main(int argc, char **argv)
{
boost::asio::io_context ctx;
BoostServer server(ctx, DEFAULT_SERVER_PORT1, DEFAULT_SERVER_PORT2, DEFAULT_SERVER_IP, DEFAULT_SERVER_IP);
ctx.run();
return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.0.0)
project(boostServer VERSION 0.1.0)
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
#boost库检查
find_package(Boost 1.77.0 COMPONENTS context thread date_time program_options filesystem system coroutine log_setup log REQUIRED)
if(Boost_FOUND)
include_directories(${Boost_INCLUDE_DIRS})
link_directories(${Boost_LIBRARY_DIRS})
else()
message("check boost lib failed!")
endif()
add_executable(boostServer main.cpp
boostServer.h boostServer.cpp
)
target_link_libraries(${PROJECT_NAME}
pthread libboost_thread.a libboost_filesystem.a libboost_log_setup.a libboost_log.a libboost_locale.a
libboost_coroutine.a libboost_context.a
)