Java网络编程-Socket编程初涉五(AIO模型的简易客户端-服务器)
什么是AIO模型?
功能
这里实现一个很简单的客户端-服务器,客户端连接服务器后,发送消息给服务器,服务器直接返回该消息给客户端即可,这只是个尝鲜版,以后也会用AIO模型来实现多人聊天室。
服务器
服务器的主要逻辑如下:
- 打开管道
AsynchronousServerSocketChannel
,绑定、监听端口(启动服务器)。 - 服务器异步调用
accept
(通过CompletionHandler
的实现类AcceptHandler
来实现异步调用,因为有回调机制),使用System.in.read()
来减少while
循环中频繁异步调用accept
。 - 当有客户端连接成功后,触发回调函数
completed
(AcceptHandler
类),然后再异步调用一次accept
,等待下一个客户端连接成功时触发(先存着);回调函数completed
会把AsynchronousSocketChannel
传过来,调用AsynchronousSocketChannel
的read
方法,来读取客户端发送过来的消息。 - 将
AsynchronousSocketChannel
的read
和write
方法实现异步调用(通过CompletionHandler
的实现类ClientHandler
来实现异步调用,因为有回调机制),为了分清楚是read
还是write
,attachment
传一个Map
(Map里面记录必要数据buffer
、type
)。当读取客户端发送的消息后,再将该消息发送给客户端(都是通过AsynchronousSocketChannel
,所以将它定义为ClientHandler
类的属性),即异步调用read
,触发回调函数(read
的回调函数),在回调函数里面再异步调用write
,触发回调函数(write
的回调函数),在回调函数里面再异步调用read
。
可能会看起来懵懵懂懂,可以结合代码来理解,代码有一定的注释。
服务器完整代码:
package aio.test;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashMap;
import java.util.Map;
public class Server {
final String LOCALHOST = "localhost";
final int DEFAULT_PORT = 8888;
AsynchronousServerSocketChannel serverChannel;
private void close(Closeable closeable){
if(closeable != null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start(){
try {
// 打开通道
serverChannel = AsynchronousServerSocketChannel.open();
// 绑定、监听端口
serverChannel.bind(new InetSocketAddress(DEFAULT_PORT));
System.out.println("启动服务器,监听端口:"+DEFAULT_PORT+"...");
while(true){
// 异步调用
serverChannel.accept(null,new AcceptHandler());
// 不会频繁调用accept的小技巧
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
} finally{
close(serverChannel);
}
}
private class AcceptHandler implements
CompletionHandler<AsynchronousSocketChannel , Object> {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
//此次调用accept完成,再一次调用accept,等待下一个客户端连接
if(serverChannel.isOpen()) {
serverChannel.accept(null , this);
}
AsynchronousSocketChannel clientChannel = result;
if(clientChannel != null && clientChannel.isOpen()){
ClientHandler handler = new ClientHandler(clientChannel);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Map<String , Object> info = new HashMap<>();
info.put("type", "read");
info.put("buffer" , buffer);
clientChannel.read(buffer , info, handler);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// 处理错误
}
}
private class ClientHandler implements
CompletionHandler<Integer , Object>{
private AsynchronousSocketChannel clientChannel;
public ClientHandler(AsynchronousSocketChannel channel){
this.clientChannel = channel;
}
@Override
public void completed(Integer result, Object attachment) {
Map<String , Object> info = (Map<String, Object>) attachment;
String type = (String) info.get("type");
if(type.equals("read")){
ByteBuffer buffer = (ByteBuffer) info.get("buffer");
// 读模式
buffer.flip();
info.put("type" , "write");
clientChannel.write(buffer ,info , this);
// 写模式(也相当于清空)
buffer.clear();
}
else if(type.equals("write")){
ByteBuffer buffer = ByteBuffer.allocate(1024);
info.put("type" , "read");
info.put("buffer" , buffer);
clientChannel.read(buffer , info , this);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// 处理错误
}
}
public static void main(String[] args) {
Server server = new Server();
server.start();
}
}
客户端
为了让大家了解其他实现异步调用的方法,客户端使用Future
来实现异步调用(正如该单词的英文意思将来
、未来
)。
Future
实现简单的异步调用,会简单一点。
调用AIO模型组件的一些方法会返回一个Future
实例,再调用该实例的get
方法会阻塞,等待该方法调用完成,阻塞便解除(具体看代码吧,客户端比较简单,注释还是比较全的)。
package aio.test;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Client {
final String LOCALHOST = "localhost";
final int DEFAULT_PORT = 8888;
AsynchronousSocketChannel clientChannel;
private void close(Closeable closeable){
if(closeable != null){
try {
closeable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start(){
try {
// 创建Channel
clientChannel = AsynchronousSocketChannel.open();
Future<Void> future = clientChannel.connect(
new InetSocketAddress(LOCALHOST , DEFAULT_PORT));
// 当未连接成功前,这里是阻塞的
future.get();
// 等待用户的输入
BufferedReader consoleReader =
new BufferedReader(new InputStreamReader(System.in));
while(true){
String input = consoleReader.readLine();
byte[] inputBytes = input.getBytes();
// 得到buffer的模式是读模式,可以Debug看一看
ByteBuffer buffer = ByteBuffer.wrap(inputBytes);
Future<Integer> writeResult = clientChannel.write(buffer);
// 等待成功写入用户管道
writeResult.get();
// 写模式
buffer.clear();
Future<Integer> readResult = clientChannel.read(buffer);
// 等待成功读取用户管道
readResult.get();
String echo = new String(buffer.array());
System.out.println(echo);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
close(clientChannel);
}
}
public static void main(String[] args) {
Client client = new Client();
client.start();
}
}
测试
测试没什么问题。
这里便完成了AIO模型的简易客户端-服务器,大家可以动手试一试,使用两种实现异步调用的方法。
如果有说错的地方,请大家不吝赐教(记得留言哦~~~~)。