传统IOSocket和NIOSocket

下面是传统IO实现的socket
服务端代码

package com.aekc.old;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {
    private static ExecutorService tp = Executors.newCachedThreadPool();

    static class HandleMsg implements Runnable {
        Socket clientSocket;
        public HandleMsg(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }
        @Override
        public void run() {
            BufferedReader is = null;
            PrintWriter os = null;
            try {
                is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                os = new PrintWriter(clientSocket.getOutputStream(), true);
                // 从InputStream当中读取客户端所发送的数据
                String inputLine = null;
                while((inputLine = is.readLine()) != null) {
                    os.println(inputLine);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if(is != null) {
                        is.close();
                    }
                    if(os != null) {
                        os.close();
                    }
                    clientSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        ServerSocket server = null;
        Socket clientSocket = null;
        try {
            server = new ServerSocket(8100);
            while(true) {
                try {
                    clientSocket = server.accept();
                    System.out.println("connect to client!");
                    tp.execute(new HandleMsg(clientSocket));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

客户端代码

package com.aekc.old;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;

public class Client {

    public static void main(String[] args) {
        Socket client = null;
        PrintWriter writer = null;
        BufferedReader reader = null;
        try {
            client = new Socket();
            client.connect(new InetSocketAddress("localhost", 8100));
            writer = new PrintWriter(client.getOutputStream(), true);
            writer.println("Hello!");
            writer.flush();
            reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
            System.out.println("from server: " + reader.readLine());
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(writer != null) {
                writer.close();
            }
            if(reader != null) {
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(client != null) {
                try {
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

接下来是NIO实现的socket
服务器无需通过为每个客户端的链接而开启一个线程。而是通过一个叫Selector的轮循器来不断的检测那个Channel有消息处理。当发现有消息要处理时,通过selectedKeys()方法就可以获取所有有消息要处理的Set集合了。使用NIO技术后,即使客户端迟钝或者出现了网络延迟等现象,并不会给服务器带来太大问题。

服务端代码

package com.aekc.nio;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ServerNIO {

    private Selector selector;

    private ExecutorService tp = Executors.newCachedThreadPool();

    class HandleMsg implements Runnable {
        ByteBuffer byteBuffer;
        SelectionKey key;
        public HandleMsg(ByteBuffer byteBuffer, SelectionKey key) {
            this.byteBuffer = byteBuffer;
            this.key = key;
        }
        @Override
        public void run() {
            byteBuffer.flip();

            //byte[] bytes = new byte[byteBuffer.remaining()];
            //byteBuffer.get(bytes);
            //System.out.println(new String(bytes, 0, bytes.length));
            System.out.println(new String(byteBuffer.array(), 0, byteBuffer.limit()));

            //将此键的 interest 集合设置为给定值
            key.interestOps(SelectionKey.OP_WRITE);
            //强迫selector返回, 使尚未返回的第一个选择操作立即返回, 即取消selector.select()的阻塞
            selector.wakeup();
        }
    }

    private void doAccept(SelectionKey key) {
        //返回创建此键的通道
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel;
        try {
            //生成和客户端的通信的通道
            clientChannel = server.accept();
            //设置非阻塞模式
            clientChannel.configureBlocking(false);
            //注册选择器, 读就绪
            clientChannel.register(selector, SelectionKey.OP_READ);
            InetAddress clientAddress = clientChannel.socket().getInetAddress();
            System.out.println("连接到客户端, 客户端ip: " + clientAddress.getHostAddress());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void doRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        try {
            int readBytes = channel.read(byteBuffer);
            if(readBytes > 0) {
                tp.execute(new HandleMsg(byteBuffer, key));
            }
        } catch (IOException e) {
            //请求取消此键的通道到其选择器的注册
            key.cancel();
            if(key.channel() != null) {
                key.channel().close();
            }
            e.printStackTrace();
        }

    }

    private void doWrite(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        try {
            byteBuffer.put("客户端,我服务端收到消息了".getBytes());
            byteBuffer.flip();
            channel.write(byteBuffer);
        } catch (Exception e) {
            key.channel();
            if(key.channel() != null) {
                key.channel().close();
            }
            e.printStackTrace();
        }
        //将此键的 interest 集合设置为给定值
        key.interestOps(SelectionKey.OP_READ);
    }

    private void startServer() throws IOException {
        selector = SelectorProvider.provider().openSelector();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //设置为非阻塞模式
        ssc.configureBlocking(false);

        InetSocketAddress isa = new InetSocketAddress(8100);
        ssc.socket().bind(isa);

        //让Selector为这个Channel服务, 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
        // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true) {
            //阻塞方法
            selector.select();
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while(iterator.hasNext()) {
                SelectionKey key = (SelectionKey) iterator.next();

                //避免重复处理相同的SelectionKey
                iterator.remove();

                //测试此键的通道是否已准备好接受新的套接字连接(socket连接)
                if(key.isAcceptable()) {
                    doAccept(key);
                //此键是否有效&&此键的通道是否已准备好进行读取
                } else if(key.isValid() && key.isReadable()) {
                    doRead(key);
                //此键是否有效&&此键的通道是否已准备好进行写入
                } else if(key.isValid() && key.isWritable()) {
                    doWrite(key);
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new ServerNIO().startServer();
    }
}

客户端代码

package com.aekc.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;

public class ClientNIO {

    private Selector selector;

    public void init(String ip, int port) throws IOException {
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        this.selector = SelectorProvider.provider().openSelector();
        channel.connect(new InetSocketAddress(ip, port));
        //连接就绪,表示客户与服务器的连接已经建立成功
        channel.register(selector, SelectionKey.OP_CONNECT);
    }

    public void connect(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //如果正在连接, 则完成连接
            if(channel.isConnectionPending()) {
                //完成套接字通道的连接过程
                channel.finishConnect();
            }
            channel.configureBlocking(false);
            channel.write(ByteBuffer.wrap(new String("hello server, I am client!\r\n").getBytes()));
            //注册选择器,读就绪
            channel.register(this.selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
            channel.close();
            key.selector().close();
        }

    }

    public void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        //创建读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        try {
            channel.read(buffer);
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("客户端收到信息: " + msg);
        } catch (IOException e) {
            channel.close();
            key.selector().close();
        }
    }

    public void working() throws IOException {
        while(true) {
            if(!selector.isOpen()) {
                break;
            }
            selector.select();
            Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
            while(ite.hasNext()) {
                SelectionKey key = ite.next();
                ite.remove();
                //连接事件发生
                if(key.isConnectable()) {
                    connect(key);
                } else if(key.isReadable()) {
                    read(key);
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        ClientNIO c = new ClientNIO();
        c.init("127.0.0.1", 8100);
        c.working();
    }
}
                                                                --参考书籍《java高并发程序设计》

猜你喜欢

转载自blog.csdn.net/XlxfyzsFdblj/article/details/79746221