一、传统BIO
网络编程基本模型Client/Server模型,即两个进程之间的互相通信,服务端提供位置信息(binding IP & Port) ,客户端通过连接操作向服务端监听地址发起连接, 三次握手建立连接,若成功,双方基于Socket通信。
传统BIO模型:
ServerSocket:绑定IP,启动监听port;
Socket:发起连接。成功后,双方通过输入输出流同步阻塞通信。
1、BIO通信模型
BIO通信模型问题:
每来一个新的客户端请求,服务端必须创建一个新线程处理。一个请求对应一个线程,不能满足高性能的要求。
// 服务端
public class TimeServer {
public static void main(String[] args) {
/*设默认端口*/
int port = 8080;
if (args != null && args.length > 0) {
port = Integer.parseInt(args[0]);
}
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
System.out.println(" time server start in port " + port);
Socket socket;
/*监听客户端连接,若无,主线程阻塞在accpet上*/
while (true) {
socket = serverSocket.accept();
/*每来一个新客户端请求都创建一个新线程处理*/
new Thread(new TimeServerHandler(socket)).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
/* 关闭资源*/
if (serverSocket != null) {
System.out.println(" time server close ");
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
// 处理socket的线程
@AllArgsConstructor
public class TimeServerHandler implements Runnable {
private Socket socket;
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
while (true) {
String line = in.readLine();
if (Objects.isNull(line)) {
break;
}
System.out.println(" time sever get order : " + line);
final String currentTime = "query time order".equalsIgnoreCase(line) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
out.println(currentTime);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out != null) {
out.close();
}
if (this.socket != null) {
try {
this.socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
// 客户端
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
port = Integer.valueOf(args[0]);
}
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1", port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("query time order");
System.out.println(" send order 2 server succeed");
String response = in.readLine();
System.out.println("NOW is :" + response);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (out != null) {
out.close();
}
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
为改善这种一个连接一个线程的模型,引入线程池或消息队列实现1个或多个线程N个客户端的模型,但底层通信使用同步阻塞IO,称“伪异步”
2 伪异步
为解决同步阻塞IO面临的一个链路需要一个线程处理的问题,可能用线程池模型进行优化—-请求接入后扔到线程池中处理,形成客户端数M: 线程池最大线程数N的比例关系,M可远大于N。通过线程池可灵活调配线程资源,设置maxPoolSize,防止高并发导致线程资源用尽。
模型:
// 伪异步时间服务器
public class FakeAsyncTimeServer {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
port = Integer.parseInt(args[0]);
}
try(ServerSocket serverSocket = new ServerSocket(port)) {
System.out.println(" time server starts in port :" + port);
/*请求处理线程池:JDK线程池维护一个消息队列和N个活跃线程,对消息队列中任务进行处理*/
TimeServerHandlerThreadPool threadPool = new TimeServerHandlerThreadPool(50, 10000);
Socket socket;
while (true) {
socket = serverSocket.accept();
/*客户端socket封装成了一个Task,实现Runnable接口*/
threadPool.execute(new TimeServerHandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
// 线程池
public class TimeServerHandlerThreadPool {
private ExecutorService executorService;
public TimeServerHandlerThreadPool(int maxPoolSize, int qSize) {
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
maxPoolSize,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(qSize));
}
public void execute(Runnable task) {
executorService.execute(task);
}
}
伪异步存在着弊端:
1) 读socket,read()会阻塞,除非* 有数据可读,或可用数据已读完,或发生异常*
InputStream# read
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
就是说:假如发请求或应答慢,或网络慢,读的一方的通信线程会被长时间阻塞,对方要60s完成,读的一方也阻塞60s,在此期间,其他接入消息只能在队列中排队。
2)写socket,write()会阻塞,除非所有要写的字节写完,或发生异常。
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
根据TCP、IP,消息的接收方处理缓慢时,将不能及时从TCP缓冲区读,导致发送方的TCP window size 减小,直到0,双方处在keep-alive状态,发送方不会再向TCP缓冲写入消息,
此时若采用同步阻塞IO,write会无限阻塞,直到TCP window size > 0,或IO异常。
可见:即使是伪异步,读、写都是同步阻塞的,阻塞时间取决于对方IO线程处理速度和网络IO传输速度。实际生产中,网络可靠性恶劣,问题可能爆发。
伪异步是BIO简单优化,并不能从根本上解决同步IO导致通信线程阻塞的问题。常见一种通信对方返回应答时间过长而导致的级联故障:
1)服务端处理慢,返回应答要60s,平时10ms
2)伪异步IO线程正在读故障服务节点响应,由于读取输入流是阻塞的,该线程会被阻塞60s
3)若所有可用线程被故障服务器阻塞,后续所有IO消息都在队列排队
4)线程池的阻塞队列打满,后续入队列操作被阻塞
5)前端的Acceptor线程接收客户端接入,被阻塞在线程池的同步阻塞队列之后,新的客户端请求被拒绝,客户端发生大面积超时
6)几乎所有连接都超时,调用者认为系统崩溃,不能接受新请求。
一言以蔽之:60s的应答时间太长,线程池被打满,新请求不能被处理,caller认为服务器奔溃。(跟DB 连接池被打满,新请求拿不到connection,等待超时后,一片告警很类似)