Java NIO ByteBuffer详解: http://donald-draper.iteye.com/blog/2357084
Java Nio系列教程; http://www.iteye.com/magazines/132-Java-NIO
NIO-TCP简单实例: http://donald-draper.iteye.com/admin/blogs/2369044
在这篇文章之前用BIO实现过TCP的通信,即Java Socket通信实例这篇文章,那边文章
主要利用BIO的ServerSocket和Socket实现加法和乘法的实现,今天我们来用NIO的
ServerSocketChannel和SocketChannel来实现加法和乘法;协议基本一致,做了一点修改
如下:
下面我们来具体的实现:
协议常量类:
package nio.socketchannel; /** * 协议常量 * @author donald * 2017年4月13日 * 下午10:49:27 */ public class ProtocolConstants { /** * 加法协议编码 */ public static final String SUM_PROTOCOL_300000 = "300000"; /** * 乘法协议编码 */ public static final String MULTI_PROTOCOL_300100 = "300100"; /** * 计算结果 */ public static final String ACK_PROTOCOL_300200 = "300200"; /** * 服务器解析协议失败 */ public static final String ACK_PROTOCOL_300300 = "300300"; /** * 协议编码长度 */ public static final int PROTOCOL_CODE_LENGTH = 6; /** * 协议操作数长度 */ public static final int OPERATE_NUM_LENGTH = 4; /** * 字符集 */ public static final String CHARSET_UTF8 = "UTF-8"; }
服务端:
package nio.socketchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import socket.ProtocolConstants; public class NIOServerCalculate { private static final String HOST = "192.168.32.126"; private static final int PORT = 10000; //manager the channel private Selector selector; /** * stat Server * @param args * @throws IOException */ public static void main(String[] args) throws IOException{ NIOServerCalculate server = new NIOServerCalculate(); server.initServer(HOST,PORT); server.listen(); } /** * get the ServerSocket and finish some initial work * @param port * @throws IOException */ public void initServer(String host, int port) throws IOException{ //get the ServerSocket ServerSocketChannel serverChannel = ServerSocketChannel.open(); // set no blocking mode serverChannel.configureBlocking(false); //bind the port serverChannel.socket().bind(new InetSocketAddress(host, port)); //get the channel manager this.selector = Selector.open(); //Register the channel to manager and bind the event serverChannel.register(selector,SelectionKey.OP_ACCEPT); } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings({ "rawtypes" }) public void listen() throws IOException{ System.out.println("=========The Server is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if(key.isAcceptable()){ ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); System.out.println("=========channel is Connected:"+channel.isConnected()); System.out.println("=========channel is Open:"+channel.isOpen()); System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending()); // channel.register(this.selector, SelectionKey.OP_READ); channel.register(this.selector, SelectionKey.OP_READ,"decodeProtol"); } else if (key.isReadable()) read(key); } } } /** * deal with the data come from the client * @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ SocketChannel channel = (SocketChannel) key.channel(); String attachedInfo = (String) key.attachment(); System.out.println("========socketChannel attachedInfo:"+attachedInfo); ByteBuffer[] proctols = null;//协议 ByteBuffer proctolCodeBuffer = null;//协议编码 proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH); ByteBuffer dataBuffer = null;//协议内容:操作数 dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH); proctols = new ByteBuffer[]{proctolCodeBuffer,dataBuffer}; System.out.println("========read caculate proctol from Client======="); // channel.read(proctols); while(proctolCodeBuffer.position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && dataBuffer.position() != 2*ProtocolConstants.OPERATE_NUM_LENGTH){ channel.read(proctols);//待读取完成协议才解析 } // channel.shutdownInput(); proctolCodeBuffer.flip(); dataBuffer.flip(); byte[] proctolCodeBytes = proctolCodeBuffer.array(); String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim(); int firstNum = 0; int secondNum = 0; int result = 0; if(proctolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000)){ System.out.println("========the protocol is sum algorithm======="); firstNum = dataBuffer.getInt(); secondNum = dataBuffer.getInt(); System.out.println("operate num is:"+firstNum+","+secondNum); result = firstNum*secondNum; proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8)); dataBuffer.clear(); //针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面 // dataBuffer.compact() dataBuffer.putInt(result); proctolCodeBuffer.flip(); dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中 channel.write(proctols); } else if(proctolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100)){ System.out.println("========the protocol is multiply algorithm======="); firstNum = dataBuffer.getInt(); secondNum = dataBuffer.getInt(); System.out.println("operate num is:"+firstNum+","+secondNum); result = firstNum*secondNum; proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8)); proctolCodeBuffer.flip(); dataBuffer.clear(); //针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面 // dataBuffer.compact() dataBuffer.putInt(result); dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中 channel.write(proctols); } else{ System.out.println("========server decode procotol fail......"); proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300300.getBytes(ProtocolConstants.CHARSET_UTF8)); proctolCodeBuffer.flip(); dataBuffer.clear(); dataBuffer.putInt(0); dataBuffer.flip(); channel.write(proctols); } /*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/ // channel.shutdownOutput(); /*关闭通道*/ // channel.close(); /*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/ } }
加法客户端:
package nio.socketchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import socket.ProtocolConstants; /** * 加法计算 * @author donald * 2017年4月10日 * 下午9:32:57 */ public class NIOClientSum { private static final String HOST = "192.168.32.126"; private static final int PORT = 10000; //manager the channel private Selector selector; /** * stat Client * @param args * @throws IOException */ public static void main(String[] args) throws IOException{ NIOClientSum client = new NIOClientSum(); client.initClient(HOST,PORT); client.listen(); } /** * get the Socket and finish some initial work * @param ip Server ip * @param port connect Server port * @throws IOException */ public void initClient(String ip,int port) throws IOException{ //get the Socket SocketChannel channel = SocketChannel.open(); // set no blocking mode channel.configureBlocking(false); //connect the Server channel.connect(new InetSocketAddress(ip,port)); //get the channel manager this.selector = Selector.open(); //Register the channel to manager and bind the event channel.register(selector,SelectionKey.OP_CONNECT); } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings("rawtypes") public void listen() throws IOException{ System.out.println("===========The Sum Client is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if(key.isConnectable()){ SocketChannel channel = (SocketChannel)key.channel(); //during connecting, finish the connect if(channel.isConnectionPending()){ channel.finishConnect(); } channel.configureBlocking(false); System.out.println("=========channel is Connected:"+channel.isConnected()); System.out.println("=========channel is Open:"+channel.isOpen()); System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending()); ByteBuffer[] proctols = null;//协议 proctols = new ByteBuffer[2]; ByteBuffer proctolCodeBuffer = null;//协议编码 proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH); // proctolCodeBuffer = ByteBuffer.wrap(new String("300000").getBytes("UTF-8")); System.out.println("ProtocolCode String length:"+ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8).length); proctolCodeBuffer.put(ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8)); System.out.println("ProtocolCode length:"+proctolCodeBuffer.position()); proctols[0] = proctolCodeBuffer; proctolCodeBuffer.flip(); ByteBuffer dataBuffer = null;//协议内容:操作数 dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH); dataBuffer.putInt(15); dataBuffer.putInt(6); System.out.println("data length:"+dataBuffer.position()); proctols[1] = dataBuffer; dataBuffer.flip(); channel.write(proctols);//将缓冲区的内容发送到通道, // channel.shutdownOutput(); System.out.println("=======write proctols to channel"); // channel.register(this.selector, SelectionKey.OP_READ); channel.register(this.selector, SelectionKey.OP_READ,"calculateResult"); } else if (key.isReadable()) read(key); } } } /** * deal with the data come from the server * @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ SocketChannel channel = (SocketChannel) key.channel(); String attachedInfo = (String) key.attachment(); System.out.println("========socketChannel attachedInfo:"+attachedInfo); ByteBuffer[] proctols = null; proctols = new ByteBuffer[]{ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH),ByteBuffer.allocate(ProtocolConstants.OPERATE_NUM_LENGTH)}; System.out.println("========read caculate result from Server======="); // channel.read(proctols); while(proctols[0].position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && proctols[1].position() != ProtocolConstants.OPERATE_NUM_LENGTH){ channel.read(proctols);//待读取完成协议才解析 } proctols[0].flip(); proctols[1].flip(); byte[] proctolCodeBytes = proctols[0].array(); String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim(); if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300200)){ int result = proctols[1].getInt(); System.out.println("========the calculated result from server:"+result); }else if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300300)){ System.out.println("========server decode procotol fail......"); } else { System.out.println("========unknow error ..."); } /*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/ // channel.shutdownOutput(); /*关闭通道*/ // channel.close(); /*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/ } }
乘法客户端:
package nio.socketchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import socket.ProtocolConstants; /** * 加法计算 * @author donald * 2017年4月10日 * 下午9:32:57 */ public class NIOClientSum { private static final String HOST = "192.168.32.126"; private static final int PORT = 10000; //manager the channel private Selector selector; /** * stat Client * @param args * @throws IOException */ public static void main(String[] args) throws IOException{ NIOClientSum client = new NIOClientSum(); client.initClient(HOST,PORT); client.listen(); } /** * get the Socket and finish some initial work * @param ip Server ip * @param port connect Server port * @throws IOException */ public void initClient(String ip,int port) throws IOException{ //get the Socket SocketChannel channel = SocketChannel.open(); // set no blocking mode channel.configureBlocking(false); //connect the Server channel.connect(new InetSocketAddress(ip,port)); //get the channel manager this.selector = Selector.open(); //Register the channel to manager and bind the event channel.register(selector,SelectionKey.OP_CONNECT); } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings("rawtypes") public void listen() throws IOException{ System.out.println("===========The Sum Client is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if(key.isConnectable()){ SocketChannel channel = (SocketChannel)key.channel(); //during connecting, finish the connect if(channel.isConnectionPending()){ channel.finishConnect(); } channel.configureBlocking(false); System.out.println("=========channel is Connected:"+channel.isConnected()); System.out.println("=========channel is Open:"+channel.isOpen()); System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending()); ByteBuffer[] proctols = null;//协议 proctols = new ByteBuffer[2]; ByteBuffer proctolCodeBuffer = null;//协议编码 proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH); // proctolCodeBuffer = ByteBuffer.wrap(new String("300000").getBytes("UTF-8")); System.out.println("ProtocolCode String length:"+ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8).length); proctolCodeBuffer.put(ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8)); System.out.println("ProtocolCode length:"+proctolCodeBuffer.position()); proctols[0] = proctolCodeBuffer; proctolCodeBuffer.flip(); ByteBuffer dataBuffer = null;//协议内容:操作数 dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH); dataBuffer.putInt(15); dataBuffer.putInt(6); System.out.println("data length:"+dataBuffer.position()); proctols[1] = dataBuffer; dataBuffer.flip(); channel.write(proctols);//将缓冲区的内容发送到通道, // channel.shutdownOutput(); System.out.println("=======write proctols to channel"); // channel.register(this.selector, SelectionKey.OP_READ); channel.register(this.selector, SelectionKey.OP_READ,"calculateResult"); } else if (key.isReadable()) read(key); } } } /** * deal with the data come from the server * @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ SocketChannel channel = (SocketChannel) key.channel(); String attachedInfo = (String) key.attachment(); System.out.println("========socketChannel attachedInfo:"+attachedInfo); ByteBuffer[] proctols = null; proctols = new ByteBuffer[]{ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH),ByteBuffer.allocate(ProtocolConstants.OPERATE_NUM_LENGTH)}; System.out.println("========read caculate result from Server======="); // channel.read(proctols); while(proctols[0].position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && proctols[1].position() != ProtocolConstants.OPERATE_NUM_LENGTH){ channel.read(proctols);//待读取完成协议才解析 } proctols[0].flip(); proctols[1].flip(); byte[] proctolCodeBytes = proctols[0].array(); String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim(); if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300200)){ int result = proctols[1].getInt(); System.out.println("========the calculated result from server:"+result); }else if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300300)){ System.out.println("========server decode procotol fail......"); } else { System.out.println("========unknow error ..."); } /*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/ // channel.shutdownOutput(); /*关闭通道*/ // channel.close(); /*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/ } }
先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is multiply algorithm=======
operate num is:17,8
加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90
乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136
在上面的测试中,channel.shutdownOutput()关闭Connection,即关闭到通道的连接,
和channel.close()关闭通道时,SocketChannel通道会有一个OP_READ事件,至于为什么,
暂时不知道,以后我们会在后面的文章中,在研究一下。
另外在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。
put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为
清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,这个概念,在ByteBuffer详解文章中已经讲过了,不记得可以再看看。
上面的Server端,以单线程处理Client端的计算请求,下面我们把它改写成多线程的形式,
Server端只处理连接请求,计算的处理单独交给一个线程来处理:
多线程Server如下:
package nio.handler; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import socket.ProtocolConstants; /** * Server * @author donald * 2017年4月13日 * 下午11:14:28 */ public class NIOServerCalculateX { private static final String HOST = "192.168.32.126"; private static final int PORT = 10000; private static ExecutorService exec= null; static { exec = Executors.newFixedThreadPool(2); } //manager the channel private Selector selector; /** * stat Server * @param args * @throws IOException */ public static void main(String[] args) throws IOException{ NIOServerCalculateX server = new NIOServerCalculateX(); server.initServer(HOST,PORT); server.listen(); } /** * get the ServerSocket and finish some initial work * @param port * @throws IOException */ public void initServer(String host, int port) throws IOException{ //get the ServerSocket ServerSocketChannel serverChannel = ServerSocketChannel.open(); // set no blocking mode serverChannel.configureBlocking(false); //bind the port serverChannel.socket().bind(new InetSocketAddress(host, port)); //get the channel manager this.selector = Selector.open(); //Register the channel to manager and bind the event serverChannel.register(selector,SelectionKey.OP_ACCEPT); } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings({ "rawtypes" }) public void listen() throws IOException{ System.out.println("=========The Server is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if(key.isAcceptable()){ ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); System.out.println("=========channel is Connected:"+channel.isConnected()); System.out.println("=========channel is Open:"+channel.isOpen()); System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending()); // channel.register(this.selector, SelectionKey.OP_READ); HanlderNioSocketChannel hanlderNioSocketChannel= new HanlderNioSocketChannel(); channel.register(hanlderNioSocketChannel.getSelector(), SelectionKey.OP_READ,"decodeProtol"); exec.submit(hanlderNioSocketChannel); } } } } }
计算处理线程:
package nio.handler; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import socket.ProtocolConstants; /** * 处理SocketChannel读事件 * @author donald * 2017年4月11日 * 下午10:32:55 */ public class HanlderNioSocketChannel implements Runnable{ private Selector selector; private String threadName; public HanlderNioSocketChannel() { super(); try { this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } threadName = Thread.currentThread().getName(); } public Selector getSelector() { return selector; } public void setSelector(Selector selector) { this.selector = selector; } @Override public void run() { try { listen(); } catch (IOException e) { e.printStackTrace(); } } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings({ "rawtypes" }) public void listen() throws IOException{ System.out.println(threadName+"=========The Server Calculate is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if (key.isReadable()) { read(key); } } } } private void read(SelectionKey key){ try { SocketChannel channel = (SocketChannel) key.channel(); String attachedInfo = (String) key.attachment(); System.out.println(threadName+"========socketChannel attachedInfo:"+attachedInfo); ByteBuffer[] proctols = null;//协议 ByteBuffer proctolCodeBuffer = null;//协议编码 proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH); ByteBuffer dataBuffer = null;//协议内容:操作数 dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH); proctols = new ByteBuffer[]{proctolCodeBuffer,dataBuffer}; System.out.println(threadName+"========read caculate proctol from Client======="); // channel.read(proctols); while(proctolCodeBuffer.position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && dataBuffer.position() != 2*ProtocolConstants.OPERATE_NUM_LENGTH){ channel.read(proctols);//待读取完成协议才解析 } // channel.shutdownInput(); proctolCodeBuffer.flip(); dataBuffer.flip(); byte[] proctolCodeBytes = proctolCodeBuffer.array(); String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim(); int firstNum = 0; int secondNum = 0; int result = 0; if(proctolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000)){ System.out.println(threadName+"========the protocol is sum algorithm======="); firstNum = dataBuffer.getInt(); secondNum = dataBuffer.getInt(); System.out.println("operate num is:"+firstNum+","+secondNum); result = firstNum*secondNum; proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8)); dataBuffer.clear(); //针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面 // dataBuffer.compact() dataBuffer.putInt(result); proctolCodeBuffer.flip(); dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中 channel.write(proctols); } else if(proctolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100)){ System.out.println(threadName+"========the protocol is multiply algorithm======="); firstNum = dataBuffer.getInt(); secondNum = dataBuffer.getInt(); System.out.println("operate num is:"+firstNum+","+secondNum); result = firstNum*secondNum; proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8)); proctolCodeBuffer.flip(); dataBuffer.clear(); //针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面 // dataBuffer.compact() dataBuffer.putInt(result); dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中 channel.write(proctols); } else{ System.out.println(threadName+"========server decode procotol fail......"); proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300300.getBytes(ProtocolConstants.CHARSET_UTF8)); proctolCodeBuffer.flip(); dataBuffer.clear(); dataBuffer.putInt(0); dataBuffer.flip(); channel.write(proctols); } /*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/ // channel.shutdownOutput(); /*关闭通道*/ //channel.close(); /*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/ } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is multiply algorithm=======
operate num is:17,8
加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90
乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136
总结:
在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,再次写数据是将覆盖以前的数据。