DatagramChannelImpl 解析四(地址绑定,关闭通道等)

DatagramChannelImpl 解析一(初始化): http://donald-draper.iteye.com/blog/2373245
DatagramChannelImpl 解析二(报文发送与接收): http://donald-draper.iteye.com/blog/2373281
DatagramChannelImpl 解析三(多播): http://donald-draper.iteye.com/blog/2373507
引言:
前面一篇文章我们看了报文通道加入多播组,阻塞和解除阻塞源地址报文等方法,先来回顾一下,
    join(报文通道加入多播组)方法,首先检查加入的多播组地址是否正确,然后校验源地址,检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,源地址为inetaddress1的多播成员关系key,有则直接返回,否则根据网络协议族family,网络接口,源地址构造多播成员关系MembershipKeyImpl,添加到注册器MembershipRegistry。
    阻塞源地址报文与解除源地址报文阻塞,首先检查源地址,再将实际的阻塞与解除阻塞工作委托给Net完成。
    drop方法,首先判断多播成员关系key是否有效,如果有效,判断多播组为ip4还是ip6,然后委托给Net完成实际的drop工作。
今天来看报文通道的其他方法,
先来看地址绑定
public volatile NetworkChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        return bind(socketaddress);
    }
 public DatagramChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        synchronized(readLock)
        {
            synchronized(writeLock)
            {
                synchronized(stateLock)
                {
		    //同步读写锁,及状态锁,确保通道打开
                    ensureOpen();
		    //如果本地地址不为null,则已绑定
                    if(localAddress != null)
                        throw new AlreadyBoundException();
                    InetSocketAddress inetsocketaddress;
                    if(socketaddress == null)
                    {
		        //如果绑定的socket地址为null,则创建统配地址为绑定地址
                        if(family == StandardProtocolFamily.INET)
                            inetsocketaddress = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
                        else
                            inetsocketaddress = new InetSocketAddress(0);
                    } else
                    {
		        //否则检查socket地址
                        inetsocketaddress = Net.checkAddress(socketaddress);
                        if(family == StandardProtocolFamily.INET)
                        {
                            InetAddress inetaddress = inetsocketaddress.getAddress();
                            if(!(inetaddress instanceof Inet4Address))
                                throw new UnsupportedAddressTypeException();
                        }
                    }
                    SecurityManager securitymanager = System.getSecurityManager();
                    if(securitymanager != null)
		        //检查socket端口监听权限
                        securitymanager.checkListen(inetsocketaddress.getPort());
		   //委托给Net
                    Net.bind(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
		    //初始化本地地址
                    localAddress = Net.localAddress(fd);
                }
            }
        }
        return this;
    }

再来看连接操作
public DatagramChannel connect(SocketAddress socketaddress)
        throws IOException
    {
        boolean flag = false;
        synchronized(readLock)
        {
            synchronized(writeLock)
            {
                synchronized(stateLock)
                {
		    //同步读写及状态锁,确保通道打开,未建立连接
                    ensureOpenAndUnconnected();
                    InetSocketAddress inetsocketaddress = Net.checkAddress(socketaddress);
                    SecurityManager securitymanager = System.getSecurityManager();
                    if(securitymanager != null)
		        //检查连接socket地址权限
                        securitymanager.checkConnect(inetsocketaddress.getAddress().getHostAddress(), inetsocketaddress.getPort());
                    int i = Net.connect(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
                    if(i <= 0)
                        throw new Error();
                    state = 1;//已来你就饿
		    //初始化远端地址
                    remoteAddress = socketaddress;
                    sender = inetsocketaddress;//初始化发送者地址
		    //缓存发送者地址及端口
                    cachedSenderInetAddress = inetsocketaddress.getAddress();
                    cachedSenderPort = inetsocketaddress.getPort();
		    //根据文件描述符获取本地地址
                    localAddress = Net.localAddress(fd);
                }
            }
        }
        return this;
    }

再来看断开连接方法
public DatagramChannel disconnect()
        throws IOException
    {
        Object obj = readLock;
        JVM INSTR monitorenter ;
        Object obj1 = writeLock;
        JVM INSTR monitorenter ;
        Object obj2 = stateLock;
        JVM INSTR monitorenter ;
	//同步读写及状态锁
	//确保处于连接状态或通道打开
        if(!isConnected() || !isOpen())
            return this;
	//获取远端地址
        InetSocketAddress inetsocketaddress = (InetSocketAddress)remoteAddress;
        SecurityManager securitymanager = System.getSecurityManager();
        if(securitymanager != null)
	    //检查连接远端地址权限
            securitymanager.checkConnect(inetsocketaddress.getAddress().getHostAddress(), inetsocketaddress.getPort());
       //完成实际断开连接操作
	disconnect0(fd);
        remoteAddress = null;
        state = 0;//未连接
        localAddress = Net.localAddress(fd);
        ...
        return this;
    }
private static native void disconnect0(FileDescriptor filedescriptor)
        throws IOException;

再来看配置通道阻塞模式
protected void implConfigureBlocking(boolean flag)
        throws IOException
    {
        //委托给IOUtil
        IOUtil.configureBlocking(fd, flag);
    }

再来看关闭通道方法
 protected void implCloseSelectableChannel()
        throws IOException
    {
        synchronized(stateLock)
        {
            if(state != 2)
	        //如果通道处于非关闭状态,则委托给报文分发器预先关闭文件描述
                nd.preClose(fd);
	   //更新报文socket计数器,自减1
            ResourceManager.afterUdpClose();
            if(registry != null)
	        //注册器不为null,则使注册器中的所有多播组无效
                registry.invalidateAll();
            long l;
	    //通知本地读写线程
            if((l = readerThread) != 0L)
                NativeThread.signal(l);
            if((l = writerThread) != 0L)
                NativeThread.signal(l);
            if(!isRegistered())
	        //如果通道当前没有注册到任何选择器,则kill,完整实际的关闭工作
                kill();
        }
    }

上面有两点要关注
1.
 if(state != 2)
     //如果通道处于非关闭状态,则委托给报文分发器预先关闭文件描述
     nd.preClose(fd);

//NativeDispatcher
    void preClose(FileDescriptor filedescriptor)
        throws IOException
    {
    }

2.
if(!isRegistered())
        //如果通道当前没有注册到任何选择器,则kill,完整实际的关闭工作
        kill();

public void kill()
        throws IOException
    {
label0:
        {
            synchronized(stateLock)
            {
                if(state != 2)
		    //如果状态为非关闭,则跳到label0
                    break label0;
            }
            return;
        }
        if(state != -1)
            break MISSING_BLOCK_LABEL_34;
        state = 2;
        obj;
        JVM INSTR monitorexit ;
        return;
        if(!$assertionsDisabled && (isOpen() || isRegistered()))
            throw new AssertionError();
	//关闭文件描述
        nd.close(fd);
        state = 2;
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception;
        throw exception;
_L1:
    }

//DatagramDispatcher
 void close(FileDescriptor filedescriptor)
        throws IOException
    {
        SocketDispatcher.close0(filedescriptor);
    }

从上面可以看出,关闭通道实际完成的工作为更新系统报文socket计数器,即自减1;
注册器不为null,则使注册器中的所有多播组无效;通知本地读写线程,通道已关闭;
委托报文分发器DatagramDispatcher关闭文件描述。
再来看其他方法,
//获取本地地址
 public SocketAddress getLocalAddress()
        throws IOException
{
    Object obj = stateLock;
    JVM INSTR monitorenter ;
    if(!isOpen())
        throw new ClosedChannelException();
    return localAddress;
    Exception exception;
    exception;
    throw exception;
}
//获取远端地址
public SocketAddress getRemoteAddress()
    throws IOException
{
    Object obj = stateLock;
    JVM INSTR monitorenter ;
    if(!isOpen())
        throw new ClosedChannelException();
    return remoteAddress;
    Exception exception;
    exception;
    throw exception;
}

//获取通道报文socket
public DatagramSocket socket()
{
    Object obj = stateLock;
    JVM INSTR monitorenter ;
    if(socket == null)
        //委托给DatagramSocketAdaptor,根据通道创建报文socket
        socket = DatagramSocketAdaptor.create(this);
    return socket;
    Exception exception;
    exception;
    throw exception;
}

//DatagramSocketAdaptor,可以简单理解为报文通道的静态代理。
public class DatagramSocketAdaptor extends DatagramSocket
{
    private final DatagramChannelImpl dc;//报文通道
    private volatile int timeout;
    private static final DatagramSocketImpl dummyDatagramSocket = new DatagramSocketImpl() {
        protected void create()
            throws SocketException
        {
        }
        protected void bind(int i, InetAddress inetaddress)
            throws SocketException
        {
        }

        protected void send(DatagramPacket datagrampacket)
            throws IOException
        {
        }
        protected int peek(InetAddress inetaddress)
            throws IOException
        {
            return 0;
        }
        protected int peekData(DatagramPacket datagrampacket)
            throws IOException
        {
            return 0;
        }
        protected void receive(DatagramPacket datagrampacket)
            throws IOException
        {
        }
        protected void setTTL(byte byte0)
            throws IOException
        {
        }
        protected byte getTTL()
            throws IOException
        {
            return 0;
        }
        protected void setTimeToLive(int i)
            throws IOException
        {
        }
        protected int getTimeToLive()
            throws IOException
        {
            return 0;
        }
        protected void join(InetAddress inetaddress)
            throws IOException
        {
        }
        protected void leave(InetAddress inetaddress)
            throws IOException
        {
        }
        protected void joinGroup(SocketAddress socketaddress, NetworkInterface networkinterface)
            throws IOException
        {
        }
        protected void leaveGroup(SocketAddress socketaddress, NetworkInterface networkinterface)
            throws IOException
        {
        }
        protected void close()
        {
        }
        public Object getOption(int i)
            throws SocketException
        {
            return null;
        }
        public void setOption(int i, Object obj)
            throws SocketException
        {
        }

    };

   }
   //构造报文socket适配器
    private DatagramSocketAdaptor(DatagramChannelImpl datagramchannelimpl)
        throws IOException
    {
        super(dummyDatagramSocket);
        timeout = 0;
        dc = datagramchannelimpl;
    }
    public void bind(SocketAddress socketaddress)
        throws SocketException
    {
        try
        {
            if(socketaddress == null)
                socketaddress = new InetSocketAddress(0);
            //委托为报文通道
            dc.bind(socketaddress);
        }
        catch(Exception exception)
        {
            Net.translateToSocketException(exception);
        }
    }
    public void close()
    {
        try
        {
	    //委托为报文通道
            dc.close();
        }
        catch(IOException ioexception)
        {
            throw new Error(ioexception);
        }
    }
    ...
}


从上面可以看出,获取通道报文socket,实际上返回的报文通道适配器DatagramSocketAdaptor,可以简单理解为报文通道的静态代理。
//确保通道打开
private void ensureOpen()
        throws ClosedChannelException
    {
        if(!isOpen())
            throw new ClosedChannelException();
        else
            return;
    }
//通道是否连接
public boolean isConnected()
    {
        Object obj = stateLock;
        JVM INSTR monitorenter ;
        return state == 1;
        Exception exception;
        exception;
        throw exception;
    }
//确保通道打开,且未连接
    void ensureOpenAndUnconnected()
        throws IOException
    {
        synchronized(stateLock)
        {
            if(!isOpen())
                throw new ClosedChannelException();
            if(state != 0)
                throw new IllegalStateException("Connect already invoked");
        }
   }
 //finalize
 protected void finalize()
        throws IOException
    {
        if(fd != null)
            close();
    }
//设置就绪事件
public boolean translateAndSetReadyOps(int i, SelectionKeyImpl selectionkeyimpl)
{
    return translateReadyOps(i, 0, selectionkeyimpl);
}
//更新就绪事件
 public boolean translateAndUpdateReadyOps(int i, SelectionKeyImpl selectionkeyimpl)
{
    return translateReadyOps(i, selectionkeyimpl.nioReadyOps(), selectionkeyimpl);
}
public boolean translateReadyOps(int i, int j, SelectionKeyImpl selectionkeyimpl)
{
    int k = selectionkeyimpl.nioInterestOps();
    int l = selectionkeyimpl.nioReadyOps();
    int i1 = j;
    //就绪事件为读1写4连接8,接受连接事件16,不是这四种事件,则返回false
    if((i & 32) != 0)
        return false;
    //下面的这段24,16不是很明白,理解的网友可以给我留言,一起探讨,
    //莫非为8+16,接受连接,并建立连接
    if((i & 24) != 0)
    {
        i1 = k;
        selectionkeyimpl.nioReadyOps(i1);
        return (i1 & ~l) != 0;
    }
    if((i & 1) != 0 && (k & 1) != 0)
        i1 |= 1;//读事件,已连接
    if((i & 4) != 0 && (k & 4) != 0)
        i1 |= 4;//写事件
    selectionkeyimpl.nioReadyOps(i1);
    return (i1 & ~l) != 0;
}
 //设置通道兴趣事件
 public void translateAndSetInterestOps(int i, SelectionKeyImpl selectionkeyimpl)
 {
     int j = 0;
     if((i & 1) != 0)
         j |= 1;//读事件
     if((i & 4) != 0)
         j |= 4;//写事件
     if((i & 8) != 0)
         j |= 2;//连接事件
     selectionkeyimpl.selector.putEventOps(selectionkeyimpl, j);
 }
//通道支持配置项
public final Set supportedOptions()
    {
        return DefaultOptionsHolder.defaultOptions;
    }
 private static class DefaultOptionsHolder
    { 
        private DefaultOptionsHolder()
        {
        }
	static final Set defaultOptions = defaultOptions();
        private static Set defaultOptions()
        {
            HashSet hashset = new HashSet(8);
            hashset.add(StandardSocketOptions.SO_SNDBUF);//发送缓冲区
            hashset.add(StandardSocketOptions.SO_RCVBUF);//接受缓存区
            hashset.add(StandardSocketOptions.SO_REUSEADDR);//地址重用
            hashset.add(StandardSocketOptions.SO_BROADCAST);//是否支持报文广播传输
            hashset.add(StandardSocketOptions.IP_TOS);//网络协议服务类型
            hashset.add(StandardSocketOptions.IP_MULTICAST_IF);//多播网络接口
            hashset.add(StandardSocketOptions.IP_MULTICAST_TTL);//多播报文存活时间
            hashset.add(StandardSocketOptions.IP_MULTICAST_LOOP);//是否支持多播环路地址
            return Collections.unmodifiableSet(hashset);
        }
       
    }
//配置选项
 public DatagramChannel setOption(SocketOption socketoption, Object obj)
        throws IOException
    {
        if(socketoption == null)
            throw new NullPointerException();
	//如果配置为通道非支持配置选项
        if(!supportedOptions().contains(socketoption))
            throw new UnsupportedOperationException((new StringBuilder()).append("'").append(socketoption).append("' not supported").toString());
        Object obj1 = stateLock;
        JVM INSTR monitorenter ;
        ensureOpen();
        if(socketoption != StandardSocketOptions.IP_TOS)
            break MISSING_BLOCK_LABEL_102;
        if(family == StandardProtocolFamily.INET)
	    //委托给Net
            Net.setSocketOption(fd, family, socketoption, obj);
        return this;
	//配置选项非多播报文存活时间,是否支持多播环路地址,调到L2,否则L1
        if(socketoption != StandardSocketOptions.IP_MULTICAST_TTL && socketoption != StandardSocketOptions.IP_MULTICAST_LOOP) 
	       goto _L2; else goto _L1
_L1:
        Net.setSocketOption(fd, family, socketoption, obj);
        this;
        obj1;
        JVM INSTR monitorexit ;
        return;
_L2:
        if(socketoption != StandardSocketOptions.IP_MULTICAST_IF) goto _L4; else goto _L3
_L3:   //配置选项为多播网络接口
        if(obj == null)
            throw new IllegalArgumentException("Cannot set IP_MULTICAST_IF to 'null'");
        NetworkInterface networkinterface = (NetworkInterface)obj;
        if(family == StandardProtocolFamily.INET6)
        {
            int i = networkinterface.getIndex();
            if(i == -1)
                throw new IOException("Network interface cannot be identified");
            //配置文件描述网络接口
            Net.setInterface6(fd, i);
        } else
        {
            Inet4Address inet4address = Net.anyInet4Address(networkinterface);
            if(inet4address == null)
                throw new IOException("Network interface not configured for IPv4");
            int j = Net.inet4AsInt(inet4address);
	    //配置文件描述网络接口
            Net.setInterface4(fd, j);
        }
        this;
        obj1;
        JVM INSTR monitorexit ;
        return;
_L4:
        Net.setSocketOption(fd, Net.UNSPEC, socketoption, obj);
      ...
    }
//获取配置项
 public Object getOption(SocketOption socketoption)
        throws IOException
    {
        if(socketoption == null)
            throw new NullPointerException();
	//如果配置为通道非支持配置选项
        if(!supportedOptions().contains(socketoption))
            throw new UnsupportedOperationException((new StringBuilder()).append("'").append(socketoption).append("' not supported").toString());
        Object obj = stateLock;
        JVM INSTR monitorenter ;
        ensureOpen();
	//网络服务类型配置选项,是调到L2,否调到L1
        if(socketoption != StandardSocketOptions.IP_TOS) goto _L2; else goto _L1
_L1:
        if(family == StandardProtocolFamily.INET)
	    //委托给Net,获取选项配置
            return Net.getSocketOption(fd, family, socketoption);
        Integer.valueOf(0);
        obj;
        JVM INSTR monitorexit ;
        return;
_L2:   //配置选项非多播报文存活时间,是否支持多播环路地址,调到L4,否则L3
        if(socketoption != StandardSocketOptions.IP_MULTICAST_TTL && socketoption != StandardSocketOptions.IP_MULTICAST_LOOP)
	    goto _L4; else goto _L3
_L3:    //委托给Net,获取选项配置
        Net.getSocketOption(fd, family, socketoption);
        obj;
        JVM INSTR monitorexit ;
        return;
_L4:    //配置选项非网络接口,调到L6,否则L5
        if(socketoption != StandardSocketOptions.IP_MULTICAST_IF) goto _L6; else goto _L5
	//下面的就不看了,与setOptions中的思路是逆向的
_L5:
        if(family != StandardProtocolFamily.INET) goto _L8; else goto _L7
_L7:
        int i = Net.getInterface4(fd);
        if(i != 0) goto _L10; else goto _L9
_L9:
        null;
        obj;
        JVM INSTR monitorexit ;
        return;
_L10:
        NetworkInterface networkinterface1;
        InetAddress inetaddress = Net.inet4FromInt(i);
        networkinterface1 = NetworkInterface.getByInetAddress(inetaddress);
        if(networkinterface1 == null)
            throw new IOException("Unable to map address to interface");
        networkinterface1;
        obj;
        JVM INSTR monitorexit ;
        return;
_L8:
        i = Net.getInterface6(fd);
        if(i != 0) goto _L12; else goto _L11
_L11:
        null;
        obj;
        JVM INSTR monitorexit ;
        return;
_L12:
        NetworkInterface networkinterface;
        networkinterface = NetworkInterface.getByIndex(i);
        if(networkinterface == null)
            throw new IOException("Unable to map index to interface");
        networkinterface;
        obj;
        JVM INSTR monitorexit ;
        return;
_L6:
        Net.getSocketOption(fd, Net.UNSPEC, socketoption);
        obj;
        JVM INSTR monitorexit ;
        return;
        Exception exception;
        exception;
        throw exception;
    }

总结;
关闭通道实际完成的工作为更新系统报文socket计数器,即自减1;注册器不为null,则使注册器中的所有多播组无效;通知本地读写线程,通道已关闭;委托报文分发器DatagramDispatcher关闭文件描述。

附:
//DatagramSocketAdaptor







//DatagramDispatcher
class DatagramDispatcher extends NativeDispatcher
{
        static 
    {
        Util.load();
    }
    DatagramDispatcher()
    {
    }
    int read(FileDescriptor filedescriptor, long l, int i)
        throws IOException
    {
        return read0(filedescriptor, l, i);
    }
    long readv(FileDescriptor filedescriptor, long l, int i)
        throws IOException
    {
        return readv0(filedescriptor, l, i);
    }
    int write(FileDescriptor filedescriptor, long l, int i)
        throws IOException
    {
        return write0(filedescriptor, l, i);
    }
    long writev(FileDescriptor filedescriptor, long l, int i)
        throws IOException
    {
        return writev0(filedescriptor, l, i);
    }
    void close(FileDescriptor filedescriptor)
        throws IOException
    {
        SocketDispatcher.close0(filedescriptor);
    }
    static native int read0(FileDescriptor filedescriptor, long l, int i)
        throws IOException;
    static native long readv0(FileDescriptor filedescriptor, long l, int i)
        throws IOException;
    static native int write0(FileDescriptor filedescriptor, long l, int i)
        throws IOException;
    static native long writev0(FileDescriptor filedescriptor, long l, int i)
        throws IOException;
}

猜你喜欢

转载自donald-draper.iteye.com/blog/2373519