Boost.Asio库HTTPServer例子解读(一)——使用io_context-per-CPU设计的HTTP服务器
文章目录
模型优势
每个io_context都分配一个CPU可以使得每个io_context的线程没有锁,充分发挥多线程优势。程序给每个连接都分配一个io_context。
主要的类
重要实现
io_context_poll实现了io_context池
- io_context_poll会循环使用队列里的io_context对象,也相当于负载平衡
- 使用boost::asio::make_work_guard保证io_context没有任务时不会退出
- server的构造函数里直接建立连接,run函数调用io_context_poll::run把io_context的线程全部启动
connection负责socket和建立连接
- 主逻辑和其他异步程序一样,start_accept()和handle_accept()反复互相调用
- 每次建立新的socket就从io_context_poll取出一个io_context
- 给每个连接都分配一个io_context,相当于每个连接都分配一个线程
官方代码代码地址
https://www.boost.org/doc/libs/1_71_0/doc/html/boost_asio/examples/cpp03_examples.html
简化后的代码如下
客户端
#include <string>
#include <iostream>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
enum {
max_length = 1024
};
int main(int argc, char *argv[]) {
try {
if (argc != 4) {
std::cerr << "Usage: blocking_tcp_echo_client <host> <port>\n";
return 1;
}
boost::asio::io_context io_context;
tcp::socket s(io_context);
tcp::resolver resolver(io_context);
boost::asio::connect(s, resolver.resolve(argv[1], argv[2]));
std::string request(argv[3]);
boost::asio::write(s, boost::asio::buffer(request, request.size()));
char reply[max_length];
size_t reply_length = boost::asio::read(s,
boost::asio::buffer(reply, request.size()));
std::cout << "Reply is: ";
std::cout.write(reply, reply_length);
std::cout << "\n";
// s.close();
}
catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
服务器端
#include <iostream>
#include <memory>
#include <utility>
#include <vector>
#include <chrono>
#include <boost/asio.hpp>
#include "io_context_pool.hpp"
/*
* server负责连接
* session负责产生socket和读写
*/
using boost::asio::ip::tcp;
class session : public std::enable_shared_from_this<session>, private boost::noncopyable {
public:
session(boost::asio::io_context &ioc) : socket_(ioc) {}
~session() {
std::cout << "~session() " << std::this_thread::get_id() << std::endl;
}
void start() {
do_read();
}
boost::asio::ip::tcp::socket &socket() {
return socket_;
}
private:
void do_read() {
std::cout << "do_read() " << std::endl;
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(buffer_, max_length),
[self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
self->do_write(length);
} else {
std::cout << "ERROR: " << ec.message() << std::endl;
return;
}
});
}
void do_write(std::size_t length) {
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(buffer_),
[self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec) {
//self->do_read();
//这里需要解释一下,如果一次读写后不关闭socket,那么再次读的时候服务器端会产生ERROR: End of file错误,虽然不会崩溃,原因是客户端没有数据发送了
boost::system::error_code ignored_ec;
self->socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both,
ignored_ec);
} else {
std::cout << "ERROR: " << ec.message() << std::endl;
return;
}
});
}
tcp::socket socket_;
enum {
max_length = 1024
};
char buffer_[max_length];
// std::vector<char> buffer_;
};
using session_sptr = std::shared_ptr<session>;
class server {
public:
server(const std::string &address, const std::string &port, size_t io_context_pool_size)
: ioc_pool_(io_context_pool_size),
acceptor_(ioc_pool_.get_io_context()) {
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(acceptor_.get_executor());
boost::asio::ip::tcp::endpoint endpoint =
*resolver.resolve(address, port).begin();
acceptor_.open(endpoint.protocol());
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
do_accept();
}
~server() { ioc_pool_.stop(); }
public:
void run() {
ioc_pool_.run();
}
private:
void do_accept() {
new_session_.reset(new session(ioc_pool_.get_io_context()));
acceptor_.async_accept(new_session_->socket(),
[this](boost::system::error_code ec) {
if (!ec) {
new_session_->start();
do_accept();
} else {
std::cout << ec.message() << std::endl;
return;
}
});
}
private:
io_context_pool ioc_pool_;
tcp::acceptor acceptor_;
session_sptr new_session_;
};
int main(int argc, char *argv[]) {
try {
if (argc != 2) {
std::cerr << "Usage: server <port>\n";
return 1;
}
server s("127.0.0.1", argv[1], 7);
s.run();
}
catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
io_context_pool
//io_context_pool.hpp
#ifndef IO_CONTEXT_POOL_H
#define IO_CONTEXT_POOL_H
#include <boost/asio.hpp>
#include <list>
#include <vector>
#include <boost/noncopyable.hpp>
/// A pool of io_context objects.
class io_context_pool : private boost::noncopyable {
public:
/// Construct the io_context pool.
explicit io_context_pool(std::size_t pool_size);
/// Run all io_context objects in the pool.
void run();
/// Stop all io_context objects in the pool.
void stop();
/// Get an io_context to use.
boost::asio::io_context &get_io_context();
private:
typedef std::shared_ptr<boost::asio::io_context> io_context_sptr;
typedef boost::asio::executor_work_guard<boost::asio::io_context::executor_type> io_context_work;
/// The pool of io_contexts.
std::vector<io_context_sptr> io_contexts_;
/// The work that keeps the io_contexts running.
std::list<io_context_work> work_;
/// The next io_context to use for a connection.
std::size_t next_io_context_;
};
#endif /*IO_CONTEXT_POOL_H*/
//io_context_pool.cpp
#include "io_context_pool.hpp"
#include <stdexcept>
#include <thread>
#include <memory>
#include <boost/bind.hpp>
io_context_pool::io_context_pool(std::size_t pool_size) : next_io_context_(0) {
if (pool_size == 0)
throw std::runtime_error("io_context_pool size is 0");
// Give all the io_contexts work to do so that their run() functions will not
// exit until they are explicitly stopped.
for (std::size_t i = 0; i < pool_size; ++i) {
io_context_sptr io_context(new boost::asio::io_context);
io_contexts_.push_back(io_context);
work_.push_back(boost::asio::make_work_guard(*io_context));
}
}
void io_context_pool::run() {
// Create a pool of threads to run all of the io_contexts.
std::vector<std::shared_ptr<std::thread> > threads;
for (std::size_t i = 0; i < io_contexts_.size(); ++i) {
std::shared_ptr<std::thread> thread(
new std::thread(boost::bind(&boost::asio::io_context::run, io_contexts_[i])));
threads.push_back(thread);
}
// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < threads.size(); ++i)
threads[i]->join();
}
void io_context_pool::stop() {
// Explicitly stop all io_contexts.
for (std::size_t i = 0; i < io_contexts_.size(); ++i)
io_contexts_[i]->stop();
}
boost::asio::io_context &io_context_pool::get_io_context() {
// Use a round-robin scheme to choose the next io_context to use.
boost::asio::io_context &io_context = *io_contexts_[next_io_context_];
++next_io_context_;
if (next_io_context_ == io_contexts_.size())
next_io_context_ = 0;
return io_context;
}