Asynchronous ---> Channel
- 重点
- AsynchronousSocketChannel 和 AsynchronousServerSocketChannel
- 支持TCP的异步Channel
-
AsynchronousSocketChannel
- 负责监听的Channel
- 创建方法:
- 调用AsynchronousServerSocketChannel.open()静态方法返回该类的实例
- 调用bind()实例方法指定监听的地址和端口号
- open() 有两种
- open() 创建默认的AsynchronousServerSocketChannel
- open(AsynchronousChannelGroup g) : 使用指定的AsynchronousChannelGroup来创建AsynchronousServerSocketChannel。
- AsynchronousChannelGroup
- 是一个异步的Channel的分组管理器,可以实现资源共享。
- 创建AsynchronousChannelGroup 时需要传入一个ExecutorService.实质时绑定一个线程池。该线程池负责处理IO事件和触发CompletionHander()处理器
- accept()接受客户端的链接
- Future accept(): 接受客户端请求,若需要返回AsynchronousSocketChannel,则应该调用Future的get()方法。get()方法会阻塞线程,等待返回AsynchronousSocketChannel。因此这种方式依然会阻塞线程(必须等待get()方法返回)
- void accpt(A attachment ,CompletionHander hander ) : 接受请求,不管成功还是失败都会触发CompletionHander的相应方法 (真的实现了异步)
- CompletionHander
- completed(V result, A attachment): io操作完成后触发,第一个参数代表IO操作返回的对象
- failed (Throwable exc, A attachment) io失败触发,第一个参数代表IO操作返回的异常或错误
- 另外不只accept()可以接受CompletionHander监听器,AsynchronousSocketChannel的connect() read(),write()方法都有两个版本,其中一个可以接受CompletionHander,和accept() 相同,默认情况都是阻塞的,需要调用get(),并返回
-
AsynchronousSocketChannel
- 创建 : open()和AsynchronousServerSocketChannel的open相同
- 调用connect()方法链接到指定ip和端口
- 调用 read() write() 方法读写
使用 无参 open()创建默认AsynchronousServerSocketChannel 和阻塞式的 accept()
public class SimpleAIOServer
{
static final int PORT = 30000;
public static void main(String[] args)
throws Exception
{
try(
// ①创建AsynchronousServerSocketChannel对象。
AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open())
{
// ②指定在指定地址、端口监听。
serverChannel.bind(new InetSocketAddress(PORT));
while (true)
{
// ③采用循环接受来自客户端的连接
Future<AsynchronousSocketChannel> future
= serverChannel.accept();
// 获取连接完成后返回的AsynchronousSocketChannel
AsynchronousSocketChannel socketChannel = future.get();
// 执行输出。
socketChannel.write(ByteBuffer.wrap("欢迎你来自AIO的世界!"
.getBytes("UTF-8"))).get();
}
}
}
}
使用 无参 open()创建默认AsynchronousSocketChannel 和阻塞式的 connect()
public class SimpleAIOClient
{
static final int PORT = 30000;
public static void main(String[] args)
throws Exception
{
// 用于读取数据的ByteBuffer。
ByteBuffer buff = ByteBuffer.allocate(1024);
Charset utf = Charset.forName("utf-8");
try(
// ①创建AsynchronousSocketChannel对象
AsynchronousSocketChannel clientChannel
= AsynchronousSocketChannel.open())
{
// ②连接远程服务器
clientChannel.connect(new InetSocketAddress("127.0.0.1"
, PORT)).get(); // ④
buff.clear();
// ③从clientChannel中读取数据
clientChannel.read(buff).get(); // ⑤
buff.flip();
// 将buff中内容转换为字符串
String content = utf.decode(buff).toString();
System.out.println("服务器信息:" + content);
}
}
}
服务器端使用带AsynchronousChannelGroup分组管理的open()和带CompletionHander监听处理器的accept(),read()方法。并分别重写了 accept() 和read()方法需要的CompletionHander监听处理器
另外再准备客户端下一次链接的地方,会嵌套监听端口,即accept()链接成功后会触发CompletionHander,而CompletionHander中又开启了监听下一次。若再有客户端接入,将会再次触发CompletionHander。
public class AIOServer
{
static final int PORT = 30000;
final static String UTF_8 = "utf-8";
static List<AsynchronousSocketChannel> channelList
= new ArrayList<>();
public void startListen() throws InterruptedException,
Exception
{
// 创建一个线程池
ExecutorService executor = Executors.newFixedThreadPool(20);
// 以指定线程池来创建一个AsynchronousChannelGroup
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup
.withThreadPool(executor);
// 以指定线程池来创建一个AsynchronousServerSocketChannel
AsynchronousServerSocketChannel serverChannel
= AsynchronousServerSocketChannel.open(channelGroup)
// 指定监听本机的PORT端口
.bind(new InetSocketAddress(PORT));
// 使用CompletionHandler接受来自客户端的连接请求
serverChannel.accept(null, new AcceptHandler(serverChannel)); // ①
Thread.sleep(100000);
}
public static void main(String[] args)
throws Exception
{
AIOServer server = new AIOServer();
server.startListen();
}
}
// 实现自己的CompletionHandler类
class AcceptHandler implements
CompletionHandler<AsynchronousSocketChannel, Object>
{
private AsynchronousServerSocketChannel serverChannel;
public AcceptHandler(AsynchronousServerSocketChannel sc)
{
this.serverChannel = sc;
}
// 定义一个ByteBuffer准备读取数据
ByteBuffer buff = ByteBuffer.allocate(1024);
// 当实际IO操作完成时候触发该方法
@Override
public void completed(final AsynchronousSocketChannel sc
, Object attachment)
{
// 记录新连接的进来的Channel
AIOServer.channelList.add(sc);
// 准备接受客户端的下一次连接
serverChannel.accept(null , this);
sc.read(buff , null
, new CompletionHandler<Integer,Object>() // ②
{
@Override
public void completed(Integer result
, Object attachment)
{
buff.flip();
// 将buff中内容转换为字符串
String content = StandardCharsets.UTF_8
.decode(buff).toString();
// 遍历每个Channel,将收到的信息写入各Channel中
for(AsynchronousSocketChannel c : AIOServer.channelList)
{
try
{
c.write(ByteBuffer.wrap(content.getBytes(
AIOServer.UTF_8))).get();
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
buff.clear();
// 读取下一次数据
sc.read(buff , null , this);
}
@Override
public void failed(Throwable ex, Object attachment)
{
System.out.println("读取数据失败: " + ex);
// 从该Channel读取数据失败,就将该Channel删除
AIOServer.channelList.remove(sc);
}
});
}
@Override
public void failed(Throwable ex, Object attachment)
{
System.out.println("连接失败: " + ex);
}
}
客户端程序同理(该程序有图形界面)
public class AIOClient
{
final static String UTF_8 = "utf-8";
final static int PORT = 30000;
// 与服务器端通信的异步Channel
AsynchronousSocketChannel clientChannel;
JFrame mainWin = new JFrame("多人聊天");
JTextArea jta = new JTextArea(16 , 48);
JTextField jtf = new JTextField(40);
JButton sendBn = new JButton("发送");
public void init()
{
mainWin.setLayout(new BorderLayout());
jta.setEditable(false);
mainWin.add(new JScrollPane(jta), BorderLayout.CENTER);
JPanel jp = new JPanel();
jp.add(jtf);
jp.add(sendBn);
// 发送消息的Action,Action是ActionListener的子接口
Action sendAction = new AbstractAction()
{
public void actionPerformed(ActionEvent e)
{
String content = jtf.getText();
if (content.trim().length() > 0)
{
try
{
// 将content内容写入Channel中
clientChannel.write(ByteBuffer.wrap(content
.trim().getBytes(UTF_8))).get(); //①
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
// 清空输入框
jtf.setText("");
}
};
sendBn.addActionListener(sendAction);
// 将Ctrl+Enter键和"send"关联
jtf.getInputMap().put(KeyStroke.getKeyStroke('\n'
, java.awt.event.InputEvent.CTRL_DOWN_MASK) , "send");
// 将"send"和sendAction关联
jtf.getActionMap().put("send", sendAction);
mainWin.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
mainWin.add(jp , BorderLayout.SOUTH);
mainWin.pack();
mainWin.setVisible(true);
}
public void connect()
throws Exception
{
// 定义一个ByteBuffer准备读取数据
final ByteBuffer buff = ByteBuffer.allocate(1024);
// 创建一个线程池
ExecutorService executor = Executors.newFixedThreadPool(80);
// 以指定线程池来创建一个AsynchronousChannelGroup
AsynchronousChannelGroup channelGroup =
AsynchronousChannelGroup.withThreadPool(executor);
// 以channelGroup作为组管理器来创建AsynchronousSocketChannel
clientChannel = AsynchronousSocketChannel.open(channelGroup);
// 让AsynchronousSocketChannel连接到指定IP、指定端口
clientChannel.connect(new InetSocketAddress("127.0.0.1"
, PORT)).get();
jta.append("---与服务器连接成功---\n");
buff.clear();
clientChannel.read(buff, null
, new CompletionHandler<Integer,Object>() //②
{
@Override
public void completed(Integer result, Object attachment)
{
buff.flip();
// 将buff中内容转换为字符串
String content = StandardCharsets.UTF_8
.decode(buff).toString();
// 显示从服务器端读取的数据
jta.append("某人说:" + content + "\n");
buff.clear();
clientChannel.read(buff , null , this);
}
@Override
public void failed(Throwable ex, Object attachment)
{
System.out.println("读取数据失败: " + ex);
}
});
}
public static void main(String[] args)
throws Exception
{
AIOClient client = new AIOClient();
client.init();
client.connect();
}
}