/** * 非阻塞IO多线线程服务端 * 当一个任务进入多线程,这个任务线程需要处理接收信息、发送信息、因而发生I/O阻塞问题 * 利用selector可以实现异步 * */ public class EchoServer02 { //轮询器,处理IO阻塞问题 private Selector selector = null; private ServerSocketChannel serverSocketChannel = null; private int port = 8088; private Charset charset = Charset.forName("GBK");//编码方式 public EchoServer02() throws IOException{ //创建一个Selector对象 selector = Selector.open(); //这个方法没有与任何本地端口绑定,并且处于阻塞模式; serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false);// IO异步处理 //把服务器与本地端口绑定 serverSocketChannel.socket().bind(new InetSocketAddress(port));//绑定服务器端口 System.out.println("服务器已启动"); } public void service() throws Exception{ /*SeverSocketChannel或Socket类通过register()方法向Selector注册事件时, register()方法会创建一个SelectionKey对象, 这个SelectionKey对象是用来跟踪注册事件的句柄。 在SelectionKey对象的有效时间,Selector会一直监控与SelectionKey对象相关的事件, 如果事件发生,就会把SelectionKey对象加入seleected-keys集合中。*/ //将ServerSocketChannel注册到Selector上 //只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监控这些事件是否发送 //SelecitonKey.OP_ACCEPT:接收连接就绪事件,表示服务器监听到了客户连接,服务器可以接收这个链接了。常量值为16 //这个客户SocketChannel会被Selector监控到 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //获取Selector中的SelectionKey数量 while(selector.select() > 0){//第一层循环 //相关事件已经被Selector捕获的SelectionKey的集合。 Set readKeys = selector.selectedKeys(); Iterator iterator = readKeys.iterator(); while(iterator.hasNext()){//第二层循环 SelectionKey key = null; try{//处理SelectionKey key = (SelectionKey)iterator.next();//取出一个SelectionKey //把SelectionKey从Selector的selected-集合中删除 iterator.remove(); if(key.isAcceptable()){//处理连接就绪事件 //获得与SelectionKey关联的ServerSocketChannel ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); //获得与客户连接的SocketChannel,这个SocketChannel默认情况是阻塞的 SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("接收到客户的连接,来自:"+socketChannel.socket().getInetAddress() +":"+socketChannel.socket().getPort()); //把SocketChannel设置为非阻塞模式, socketChannel.configureBlocking(false); //创建一个用于存放用户发送来的数据的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //把SocketChannel向Selector注册就读事件和就绪事件,且关联了一个buffer附件 socketChannel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer); } if(key.isReadable()){//处理读就绪事件 receive(key); } if(key.isWritable()){//处理写就绪事件 send(key); } }catch (Exception e) { e.printStackTrace(); try { if(key != null){ //使这个Seleciton失效 //使得Selector不再监控这个SelectionKey感兴趣的事件 key.cancel(); key.channel().close();//关闭这个SelectionKey关联的SocketChannel } } catch (Exception e2) { e2.printStackTrace(); } } } } } public void send(SelectionKey key) throws IOException{ //获得与SelectionKey关联的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); //把极限设为位置,把位置设为0 buffer.flip(); //按照GBK编码,把buffer中的字节转换为字符串 String data = decode(buffer); //如果还没有读到一行数据就返回 if(data.indexOf("\r\n") == -1){ return ; } //截取一行数据 String outputData = data.substring(0, data.indexOf("\n")+1); System.out.println(outputData); //把输出的字符串安装GBK编码,转换为字节,把它放入outputBuffer ByteBuffer outputBuffer = encode("ehco:" + outputData); //输出outputBuffer中所有的字节 while(outputBuffer.hasRemaining()){ socketChannel.write(outputBuffer); //把outputData字符串按照GBK编码,转换为字节 ,把它放入ByteBuffer中 ByteBuffer temp = encode(outputData); //把buffer的位置设为temp的极限 buffer.position(temp.limit()); //删除buffer中已经处理的数据 buffer.compact(); //如果已经输出了字符串"bye\r\n",就使SelectionKey失效,并关闭SocketChannel if(outputData.equals("bye\r\n")){ key.cancel(); socketChannel.close(); System.out.println("关闭与客户的连接"); } } } /** * receive()方法把读入的数据都放在一个ByteBuffer中, * send()方法就从这个ByteBuffer中取出数据 * 如果ByteBuffer中还没有一行字符串,就什么不做,直接退出send()方法 * @param key * @throws IOException */ public void receive(SelectionKey key) throws IOException{ //获得与SelectionKey关联的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); //创建一个ByteBuffer,用于存放读到的数据 ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的极限设为容量 buffer.limit(buffer.capacity()); //把readBuff中内容拷贝到buffer中, //假设buffer容量足够大,不会出现缓冲区溢出异常 buffer.put(readBuff); } public String decode(ByteBuffer buffer){//解码,将字节转换为字符串的过程 CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toString(); } public ByteBuffer encode(String str){//编码,将字符串转换为字节 return charset.encode(str); } public static void main(String[] args) throws Exception{ EchoServer02 server02 = new EchoServer02(); server02.service(); } }
java 服务端I/O非阻塞实现05
猜你喜欢
转载自www.cnblogs.com/lazyli/p/10786906.html
今日推荐
周排行