NIO与Socket笔记 :Selector 类的使用

Selector类的主要作用是作为 SelectableChannel对象的多路复用器。

可通过调用 Selector类的 open()方法创建选择器,该方法将使用系统的默认 Selector­ Provider创建新的选择器。
也可通过调用自定义选择器提供者的 openSelector()方法来创建 选择器。

在通过选择器的 close()方法关闭选择器之前,选择器一直保持打开状态。

通过 SelectionKey 对象来表示 SelectableChannel (可选择通道)到选择器的注册 。

选择 器维护了 3 种 SelectionKey-Set (选择键集) 。

1 )键集 : 包含的 键表示当前通道到此选择器的注册,也就是通过某个通道的 register() 方法注册该通道时,所带来的影响是向选择器的键集中添加了一个键。 此集合由 keys()方 法返回。 键集本身是不可直接修改的。

2 )已选择键集 : 在首先调用 select()方法选择操作期间,检测每个键的通道是否已经至少为该键的相关操作集所标识的一个操作准备就绪,然后调用 selectedKeys()方法返回已就绪键的集合 。 已选择键集始终是键集的一个子集 。

3 )已取消键集:表示已被取消但其通道尚未注销的键的集合 。 不可直接访问此集合 。 已 取消键集始终是键集的一个子集 。 在 select()方法选择操作期间,从键集中移除已取消的键 。

在新创建的选择器中,这 3 个集合都是空集合 。

验证 public abstract int select()方法具有阻塞性

public abstract int select()方法的作用是选择一组键,其相应的通道己为 I/O 操作准备 就绪。 此方法执行处于阻塞模式的选择操作。

仅在至少选择一个通道、调用此选择器的 wakeup()方法,或者当前的线程已中断(以先到者为准)后,此方法才返回。 返回值代表添 加到就绪操作集的键的数目,该数目可能为零 , 为零代表就绪操作集中的内容并没有添加新 的键,保持内容不变 。

select()方法不阻塞的原因和解决办法

出现 “死循环”的原因 是在客户端连接服务端时,服务端中的通道对 accept事件并 未处理 , 导致 accept事件一直存在,

也就是 select()方法一直检测到有准备好的通道要对 accept事件进行处理,但一直未处理,就一直呈 “死循环”输出的状态了

解决“死循环” 的办法是将 accept事件消化处理。

package com.nio.selector.test02;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;


public class server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel . open() ; 
        serverSocketChannel .bind( new InetSocketAddress( "localhost", 8888)) ;
        serverSocketChannel . configureBlocking( false ) ;
        Selector selectorl = Selector . open();
        SelectionKey selectionKeyl = serverSocketChannel.register(selectorl , SelectionKey.OP_ACCEPT) ;
        boolean isRun = true;
        while (isRun == true) {
            int keyCount = selectorl.select() ;
            Set<SelectionKey> setl = selectorl.keys() ;
            Set<SelectionKey> set2 = selectorl .selectedKeys() ;
            System. out.println("keyCount =" + keyCount);
            System. out .println ("setl size=" + setl.size()) ;
            System.out.println("set2 size=" + set2.size());
            System. out .println() ;
            Iterator<SelectionKey> iterator= set2.iterator();
            while (iterator .hasNext () ) {
                SelectionKey key = iterator.next() ;
                ServerSocketChannel channel = (ServerSocketChannel) key . channel() ;
                channel . accept () ; //使用方法 accept ()将事件处理掉

            }
        }

        serverSocketChannel . close() ;

    }


}
        
package com.nio.selector.test02;

import java.io.IOException;
import java.net.Socket;

public class client {
    public static void main(String[] args) throws IOException {
        Socket socket= new Socket("localhost" , 8888 ) ;
        socket.close () ;
    }
}

出现重复消费的情况

如果两个不同的通道注册到相同的选择器,那么极易出现重复消费的情况。

package com.nio.selector.test03;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;


public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open(); 
        serverSocketChannel1.bind(new InetSocketAddress ("localhost", 7777)) ; 
        serverSocketChannel1 .configureBlocking( false );

        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        serverSocketChannel2.bind( new InetSocketAddress( "localhost" , 8888)) ;
        serverSocketChannel2.configureBlocking (false ) ;


        Selector selector1 =Selector.open() ;

        SelectionKey selectionKey1 = serverSocketChannel1.register(selector1, SelectionKey.OP_ACCEPT) ;
        SelectionKey selectionKey2 = serverSocketChannel2 .register(selector1, SelectionKey.OP_ACCEPT ) ;

        boolean isRun = true ;
        while (isRun== true) {
            int keyCount = selector1 .select();
            Set<SelectionKey> setl = selector1.keys();
            Set<SelectionKey> set2 = selector1.selectedKeys() ;
            System.out.println( "keyCount ="+ keyCount) ;
            System.out.println (" set1 size=" + setl.size()) ;
            System.out.println (" set2 size=" + set2.size() ) ;
            Iterator<SelectionKey> iterator= set2.iterator();

            while (iterator.hasNext()) {
                SelectionKey key= iterator .next();
                ServerSocketChannel channel = (ServerSocketChannel)key.channel();

                SocketChannel socketChannel = channel.accept() ;

                InetSocketAddress ipAddress = (InetSocketAddress) channel.getLocalAddress();


                System.out.println(ipAddress.getPort() + " 被客户端连接了 ");
                System.out.println();
//                iterator. remove () ;// 删除当前的 SelectionKey


            }

        }
        serverSocketChannel1.close() ;
        serverSocketChannel2.close();
    }


}
        

输出: 

keyCount =1
 set1 size=2
 set2 size=1
7777 被客户端连接了 

keyCount =1
 set1 size=2
 set2 size=2
7777 被客户端连接了 

8888 被客户端连接了 

keyCount =0
 set1 size=2
 set2 size=2
7777 被客户端连接了 

8888 被客户端连接了 

使用 remove()方法解决重复消费问题

package com.nio.selector.test03;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;


public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open(); 
        serverSocketChannel1.bind(new InetSocketAddress ("localhost", 7777)) ; 
        serverSocketChannel1 .configureBlocking( false );

        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        serverSocketChannel2.bind( new InetSocketAddress( "localhost" , 8888)) ;
        serverSocketChannel2.configureBlocking (false ) ;


        Selector selector1 =Selector.open() ;

        SelectionKey selectionKey1 = serverSocketChannel1.register(selector1, SelectionKey.OP_ACCEPT) ;
        SelectionKey selectionKey2 = serverSocketChannel2 .register(selector1, SelectionKey.OP_ACCEPT ) ;

        boolean isRun = true ;
        while (isRun== true) {
            int keyCount = selector1 .select();
            Set<SelectionKey> setl = selector1.keys();
            Set<SelectionKey> set2 = selector1.selectedKeys() ;
            System.out.println( "keyCount ="+ keyCount) ;
            System.out.println (" set1 size=" + setl.size()) ;
            System.out.println (" set2 size=" + set2.size() ) ;
            Iterator<SelectionKey> iterator= set2.iterator();

            while (iterator.hasNext()) {
                SelectionKey key= iterator .next();
                ServerSocketChannel channel = (ServerSocketChannel)key.channel();

                SocketChannel socketChannel = channel.accept() ;

                InetSocketAddress ipAddress = (InetSocketAddress) channel.getLocalAddress();


                System.out.println(ipAddress.getPort() + " 被客户端连接了 ");
                System.out.println();
                iterator. remove () ;// 删除当前的 SelectionKey


            }

        }
        serverSocketChannel1.close() ;
        serverSocketChannel2.close();
    }


}
        

输出:

import java.io.IOException;
import java.net.Socket;

public class CientA {
    public static void main(String[] args) throws IOException {
        Socket socket= new Socket("localhost" , 7777 ) ;
        socket.close () ;
    }
}
public class CientB {
    public static void main(String[] args) throws IOException {
        Socket socket= new Socket("localhost" , 8888 ) ;
        socket.close () ;
    }
}
package com.nio.selector.test03;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;


public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open(); 
        serverSocketChannel1.bind(new InetSocketAddress ("localhost", 7777)) ; 
        serverSocketChannel1 .configureBlocking( false );

        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        serverSocketChannel2.bind( new InetSocketAddress( "localhost" , 8888)) ;
        serverSocketChannel2.configureBlocking (false ) ;


        Selector selector1 =Selector.open() ;

        SelectionKey selectionKey1 = serverSocketChannel1.register(selector1, SelectionKey.OP_ACCEPT) ;
        SelectionKey selectionKey2 = serverSocketChannel2 .register(selector1, SelectionKey.OP_ACCEPT ) ;

        boolean isRun = true ;
        while (isRun== true) {
            int keyCount = selector1 .select();
            Set<SelectionKey> setl = selector1.keys();
            Set<SelectionKey> set2 = selector1.selectedKeys() ;
            System.out.println( "keyCount ="+ keyCount) ;
            System.out.println (" set1 size=" + setl.size()) ;
            System.out.println (" set2 size=" + set2.size() ) ;
            Iterator<SelectionKey> iterator= set2.iterator();

            while (iterator.hasNext()) {
                SelectionKey key= iterator .next();
                ServerSocketChannel channel = (ServerSocketChannel)key.channel();

                SocketChannel socketChannel = channel.accept() ;

                InetSocketAddress ipAddress = (InetSocketAddress) channel.getLocalAddress();


                System.out.println(ipAddress.getPort() + " 被客户端连接了 ");
                System.out.println();
                iterator. remove () ;// 删除当前的 SelectionKey


            }

        }
        serverSocketChannel1.close() ;
        serverSocketChannel2.close();
    }


}
        

输出:

keyCount =1
 set1 size=2
 set2 size=1
7777 被客户端连接了 

keyCount =1
 set1 size=2
 set2 size=1
8888 被客户端连接了 

keyCount =1
 set1 size=2
 set2 size=1
7777 被客户端连接了 

验证产生的 set1 和 set2 关联的各自对象一直是同一个

setl 和 set2一直在使用各自不变的对象,也就会出现一直向 set2中添加 S巳lection­ Key造成重复消费的效果 ,

因此,就要结合 remove()方法避免重复消费。

int selector.select()方法返回值的含义

int selector.select()方法返回值的含义是己更新其准备就绪操作集的键的数 目 ,该数 目 可能为零或排零, 非零的情况就是向set2中添加SelectionKey的个数, 值为零的情况是 set2 中的元素并没有更改。

从已就绪的键集中获得通道中的数据

package com.nio.selector.test05;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        serverSocketChannel1.bind(new InetSocketAddress("localhost", 7777)) ;
        serverSocketChannel1 .configureBlocking( false );




        Selector selector =Selector.open() ;

        SelectionKey selectionKey1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT) ;


        boolean isRun = true ;
        while (isRun== true) {
            int keyCount = selector .select();
            Set<SelectionKey> set1 = selector.keys();
            Set<SelectionKey> set2 = selector.selectedKeys() ;
            System.out.println( "keyCount ="+ keyCount) ;
            System.out.println (" set1 size=" + set1.size()) ;
            System.out.println (" set2 size=" + set2.size() ) ;

            Iterator<SelectionKey> iterator= set2.iterator();

            while (iterator.hasNext()) {
                SelectionKey key= iterator .next();

                if(key.isAcceptable()){
                    ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                    ServerSocket serverSocket = channel.socket() ;

                    Socket socket = serverSocket.accept() ;
                    InputStream inputStream = socket .getInputStream();
                    byte [] byteArray = new byte[1000];
                    int readLength = inputStream . read(byteArray) ;
                    while (readLength != -1){
                        String newString = new String(byteArray,0,readLength);
                        System.out.println(newString);
                        readLength = inputStream.read(byteArray);

                    }

                    inputStream.close();
                    socket.close();

                    InetSocketAddress ipAddress = (InetSocketAddress) channel.getLocalAddress();


                    System.out.println(ipAddress.getPort() + " 被客户端连接了 ");
                    System.out.println();
                    iterator. remove () ;// 删除当前的 SelectionKey

                }



            }

        }
        serverSocketChannel1.close() ;

    }


}
package com.nio.selector.test05;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

public class CientA {
    public static void main(String[] args) throws IOException {
        Socket socket= new Socket("localhost" , 7777 ) ;
        OutputStream outputStream = socket.getOutputStream() ;
        outputStream.write("正正正正正".getBytes());


        socket.close () ;
    }
}

输出:

Connected to the target VM, address: '127.0.0.1:52670', transport: 'socket'
keyCount =1
 set1 size=1
 set2 size=1
正正正正正
7777 被客户端连接了 

keyCount =1
 set1 size=1
 set2 size=1
正正正正正
7777 被客户端连接了 
 

对相同的通道注册不同的相关事件返回同一个 SelectionKey

一个 SocketChannel通道注册两个事件并没有创建出两个 SelectionKey,而是创建出一 个,

不同的事件是在同一个 SelectionKey 中进行注册的。

另一个 SelectionKey代表关联的是 ServerSocketChannel通道。

判断选择器是否为打开状态

public abstract boolean isOpen()方法的作用是告知此选择器是否已打开 。

返回值 当且仅 当此选择器已打开时才返回 true。

public abstract void close()方法的作用是关闭此选择器。

获得 SelectorProvider provider 对象

public abstract SelectorProvider provider()方法的作用是返回创建此通道的提供者 。

返回此选择器的键集

public abstract Set<SelectionKey> keys()方法的作用是返回此选择器的键集。不可直接 修改键集 。

仅在已取消某个键并且已 注销 其通道后才移除该键 。

试图修改键集会导致抛出 UnsupportedOperationException。

public abstract int select(long timeout)方法的使用

public abstract int select(long timeout)方法的作用是选择一组键,其相应的通道已为 I/O操作准备就绪。

此方法执行处于阻塞模式的选择操作。

仅在至少选择一个通道、 调用此选择 器的 wakeup()方法、 当前的线程已中断,或者给定的超时期满(以先到者为准)后, 此方法 才返回。 此方法不提供实时保证:它安排了超时时间, 就像调用Object.wait(long)方法一样。

参数 timeout代表如果为正, 则在等待某个通道准备就绪时最多阻塞 timeout毫秒;

如果为 零 , 则无限期地阻塞 ;  必须为非负数。

返回值代表己更新其准备就绪操作集的键的数 目, 该 数目可能为零。

public abstract int selectNow()方法的使用

public abstract int selectNow()方法的作用是选择一组键,其相应的通道已为 1/0操作准 备就绪 。

此方法执行非 阻塞的选择操作 。 如果自 从前一次选择操作后,没有通道变成可选择 的, 则此方法直接返回零。

调用此方法会清除所有以前调用 wakeup()方法所得的结果。

返回值代表由选择操作更新其准备就绪操作集的键的数目,该数目 可能为零。

唤醒操作

public abstract Selector wakeup()方法的作用是使尚未返回的第一个选择操作立即返回。

如果另一个线程目前正阻塞在 select()或 select(long)方法的调用中, 则该调用将立即返回。

测试若干细节

对 SelectionKey 执行 cancel()方法后的效果

调用该键的 caneeI()方法来取消键,该键都被添加到其选择器的已取消键集中。

取消某 个键会导致在下一次 select() 方法选择操作期间注销该键的通道,而在注销时将从所有选择 器的键集中移除该键 。

 

对通道执行 close()方法后的效果

关闭某个键的通道 ,通道对应的键都被添加到其选择器的已取消键集中 ,

会导致在下 一次 select()方法选择操作期间注销该键的通道,而在注销时将从所有选择器的键集中移除 该键。

在新创建的选择器中, 3 个集合都是空集合

删除键集中的键会导致 UnsupportedOperationException 异常

多线程环境下删除键集中的键会导致 ConcurrentModificationException 异常

一般情况下,选择器的键和巳选择键集由多个并发线程使用是不安全的。

阻塞在 select()或 select (long)方法中的线程通过选择器的 close()方法被中断

阻塞在 select()或 select (long)方法中的线程调用 interrupt() 方法被中断

调用 Selector.close()万法删除全部键并且通道注销

SelectionKey 类的使用

SelectionKey类表示 SelectableChannel在选择器中的注册的标记。

在每次向选择器注册通道时,就会创建一个选择键( SelectionKey)。 通过调用某个键 的 cancel()方法、关闭其通道,或者通过关闭其选择器取消该键之前,通道一直保持有效。 取消某个键不会立即从其选择器中移除它,而是将该键添加到选择器的已取消键集,以便 在下一次进行 select()方法操作时移除它 。 可通过调用某个键的 isValid()方法来测试其有 效性。

选择键包含两个集,是表示为整数值的操作集,其中每一位都表示该键通道所支持的 一类可选择操作 。

1 ) interest 集 ,确定了下一次调用某个选择器的 select()方法时,将测试哪类操作的准 备就绪信息 。

创建该键时使用给定的值初始化 interest集合,之后可通过 interestOps(int)方 法对其进行更改 。

2) ready集,标识了这样一类操作, 即某个键的选择器检测到该键的通道已为此类操 作准备就绪 。

在创建该键时, ready 集初始化为零,可以在之后的 select()方法操作中通过选 择器对其进行更新,但不能直接更新它 。

判断是否允许连接 SelectableChannel 又像

public final boolean isAcceptable()方法的作用是测试此键的通道是否已准备好接受新的套接字连接

public final boolean isConnectable()方法的作用是测 试此键的通道是否已完成其套接字连接操作 。

public abstract SelectableChannel channel()方法的作用是返回为之创建此键的通道。即 使已取消该键,此方法仍继续返回通道 。

判断是否已准备好进行读取

public final boolean isReadable()方法的作用是测试此键的通道是否已准备好进行读取。

判断是否已准备好进行写入

public final boolean isWritable()方法的作用是测试此键的通道是否已准备好进行写入。

返回 SelectionKey 关联的选择器

public abstract Selector selector()方法的作用是返回 SelectionKey关联的选择器 。 即使 已取消该键, 此方法仍将继续返回选择器 。

在注册操作时传入 attachment 附件

SelectableChannel类中的 public final SelectionKey register(Selector sel, int ops, Object att)方法的作用是向给定的选择器注册此通道,返回一个选择键。

如果当前已向给定的选择 器注册了此通道,则返回表示该注册的选择键。

设置 attachment 附件

public final Object attach(Object ob)方法的作用是将 给定 的对象附加到此键 。

之后可通 过 attachment()方法获取已附加的对象。一次只能附加一个对象。

调用此方法会导致丢弃所 有以前的附加对象。 通过附加 null可丢弃当前的附加对象。

参数 ob代表要附加的对象,可 以为 null。 返回值代表先前已附加的对象(如果有),否则返回 null。

获取与设置此键的 interest 集合

public abs位act int interestOps()方法的作用是获取此键的 interest集合。

可保证返回的集 合仅包含对于 此键的通道而言有效的操作位。

可在任意时 间调用此方法 。 是否受阻塞,以及 阻塞时间长短都是与实现相关的 。

返回值代表此键的 interest集合 。

public abstract SelectionKey interestOps(int ops)方法的作用是将此键的 interest集合设置 为给定值。

可在任意时间调用此方法。是否受阻塞 ,以及阻塞时间长短都是与实现相关的 。

参数 ops代表新的 interest集合,返回值代表此选择键。

判断此键是否高效

public abstract boolean isValid()方法的作用是告知此键是否有效 。

键在创建时是有效的,并在被取消、其通道己关闭或者其选择器己关闭之前保持有效。

返回值当且仅当 此键有 效时才返回 true。

获取此键的 ready 操作集合

public abstract int readyOps()方法的作用是获取此键的 ready操作集合,

可保证返回的 集合仅包含对于此键的通道而言有效的操作位,返回值代表此键的 ready操作集合。

 

取消操作

public abstract void cancel()方法的作用是请求取消此键的通道到其选择器的注册。

一旦 返回,该键就是无效的, 并且将被添加到其选择器的已取消键集中。

在进行下一次选择操作 时 , 将从所有选择器的键集 中移除该键。

如果已取消了此键 ,则调用此方法元效。

DatagramChannel 类的使用

DatagramChannel类是针对面向 DatagramSocket的可选择通道。

DatagramChannel不是 DatagramSocket 的完 整抽象,必须通过调用 socket()方法获得的关联 DatagramSocket对象来完成套接宇选项的绑定和操作。

使用 DatagramChannel 类实现 UDP 通信

连接操作

public abstract DatagramChannel connect(SocketAddress remote)方法的作用是连接此通 道的套接字 。

断开连接

public abstract DatagramChannel disconnect()方法的作用是断开此通道套接字的连接。

将通道加入组播地址

将通道加入组播地址旦接收指定客户端数据

MembershipKey join(InetAddress group, Networklnterface interf, lnetAddress source)方法的 作用是将通道加入到组播地址中,

但是会通过 source参数来接收指定客户端 IP 发来的数据包。

Pipe.SinkChannel 和 Pipe.SourceChannel 类的使用

Pipe.SinkChannel类表示 Pipe的可写人结尾的通道.

Pipe类实现单向 管道传送的通道对 。

管道由一对通道组成:一个可写人的 sink通道和一个可读 取的 source通道。

一旦将某些字节写入接收器通道,就可以按 照与写入时完全相同的顺序从源通道中读取这些字节 。

在另一个线程从管道中读取这些字节或先前已写人的字节之前,是否阻塞将该字节写人管道的线程是与系统相关的,因
此是未指定的。

很多管道实现都对接收器和源通道之间一定数量的字节进行缓冲,但是不应 假定会进行这种缓冲 。

SelectorProvider 类的使用

SelectorProvider是用 于选择 器 和可选择通道的服务提供者类 。

选择器提供者实现类是 SelectorProvider类的一个子类,它具有零参数的构造方法, 并实现了以下指定的抽象方法。

给定的对 Java虚拟机的调用维护了单个系统级的默认提供者实例,它由 provider()方法返回。

第一次调用该 方 法将查 找 指定 的 默 认 提 供 者。

系统级的默认提供者由 Datagram­ Channel、 Pipe、 Selector、 ServerSocketChannel 和 SocketChannel类的静态 open()方法使用。

System.inheritedChann巳I() 方法 也使用它 。

除了 默认提供者之外,程序还可以使用其他提供 者, 方法是通过实例化一个提供者,然后直接调用此类 中定义的 open()方法。

多个并发线程可安全地使用 SelectorProvider类中的所有方法。

猜你喜欢

转载自blog.csdn.net/zhanglong_4444/article/details/89064714