java udp分别用DatagramSocket和DatagramChannel实现多计算机接收广播数据

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


DatagramSocket实现

server端:

计算机A:

public class UdpClient2 implements Callable {
    
    
 
    @Override
    public Object call() throws Exception {
    
    
        DatagramSocket socket = new DatagramSocket(7777);
        byte[] array = new byte[1024];
        DatagramPacket mypackt = new DatagramPacket(array, array.length);

        try {
    
    
            while (true) {
    
    
// 读取Socket中的数据,读到的数据放在inPacket所封装的字节数组里。
                socket.receive(mypackt);
                String receiveStr = new String(array, 0, mypackt.getLength());
// 打印输出从socket中读取的内容
                System.out.println("信息:" + receiveStr);
                return receiveStr;
            }
        }
// 捕捉异常
        catch (IOException ex) {
    
    
            ex.printStackTrace();
            try {
    
    
                if (socket != null) {
    
    
// 让该Socket离开该多点IP广播地址
                    //      socket.leaveGroup(broadcastAddress);
// 关闭该Socket对象
                    socket.close();
                }

            } catch (Exception e) {
    
    
                e.printStackTrace();
                return null;
            }
            return null;
        }
    }
    }

main函数:

UdpClient2 udpClient = new UdpClient2 ();=
     UdpClient2 udpClient = SpringUtils.getBean(UdpClient2.class);

        FutureTask<String> udpfuture = new FutureTask<String>(udpClient);
//new Thread(udpfuture).start();
POOL_EXECUTOR.submit(udpfuture);
        System.out.println(udpfuture.get()); FutureTask的get方法会自动阻塞,直到获取计算结果为止

在另一台计算机B上也执行相同的代码 (同一个局域网)

客户端:

public class Client1 {
    
    
    public static void main(String[] args) throws IOException {
    
    
        DatagramSocket socket = new DatagramSocket();
        socket.setBroadcast(true);
        socket.connect(InetAddress.getByName("192.168.1.255"),7777);
        String string = "fsdfsd1234";
        byte[]newArray = string.getBytes(StandardCharsets.UTF_8);
        DatagramPacket packet = new DatagramPacket(newArray,newArray.length);
        socket.send(packet);
        socket.close();
    }
}

运行两个服务端:
可以看到结果:
在这里插入图片描述
在Windows操作系统中,服务端可以使用代码

DatagramSocket socket = new DatagramSocket(new InetSocketAddress("192.168.0.150",
7777));DatagramSocket socket = new DatagramSocket(7777);

来开启服务端的服务,这样都能接收到客户端的UDP广播信息。 而在Linux中必须使用代码 DatagramSocket socket = new DatagramSocket(7777); 来开启服务端的服务。经过笔者的测试,如果在Linux中使用代码

  DatagramSocket socket = new DatagramSocket(new InetSocketAddress("192.168.0.150", 7777));

获得可用的广播地址

作为服务端,则Linux中的UDP服务器接收不到UDP广播消息。 另外,一定要留意广播地址,错误的广播地址不能实现UDP广播的效果。具体使用哪些广播地址,要根据当前计算机中的IP进行换算,可自行查看搜索引擎提供的相关资料,或使用如下程序代码获得。

在这里插入图片描述

DatagramChannel实现

服务端

public class UdpChannelServer implements Callable {
    
    

    @Override
    public Object call() throws Exception {
    
    
        DatagramChannel channel = DatagramChannel.open();
        channel.configureBlocking(false);
        channel.bind(new InetSocketAddress("192.168.1.5",8888));
        Selector selector = Selector.open();
        SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_READ);
        boolean isRun = true;
        String str = null;
        while (isRun)
        {
    
    
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext())
            {
    
    
                SelectionKey key = iterator.next();
                if(key.isReadable())
                {
    
    
                    channel = (DatagramChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    channel.receive(buffer);
                    str = new String(buffer.array(),0,buffer.position());
                    System.out.println(str);
                }
                iterator.remove();
            }
        }
        channel.close();
        return str;
    }
}

run函数:

public class Run{
    
    
  private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
    private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS, AVALIABLE_PROCESSORS * 2,
            1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5),
            new ThreadPoolExecutor.CallerRunsPolicy());
 public static void main(String[] args){
    
    
        UdpChannelServer server = new UdpChannelServer();
        FutureTask<String> task2  = new FutureTask<String>(server);
        POOL_EXECUTOR.submit(task2);
        }

客户端

public class ChannelClient {
    
    
    private static Scanner scan = null;
  private static   ByteBuffer buffer = null;
    public static void main(String[] args) throws IOException {
    
    

                DatagramChannel channel = DatagramChannel.open();
                channel.configureBlocking(false);

                Selector selector = Selector.open();
                SelectionKey selectionKey1 = channel.register(selector, SelectionKey.
                        OP_WRITE);
                int keyCount = selector.select();
                Set<SelectionKey> selectedKeysSet = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeysSet.iterator();
                while (iterator.hasNext()) {
    
    
                    SelectionKey key = iterator.next();
                    if (key.isWritable()) {
    
    
                        scan = new Scanner(System.in);
// 不断读取键盘输入
                        while (scan.hasNextLine()) {
    
    
// 将键盘输入的一行字符串转换字节数组
                            byte[] buff = scan.nextLine().getBytes();
// 设置发送用的DatagramPacket里的字节数据

                            buffer = ByteBuffer.wrap(buff);
                            // 如果在两台物理计算机中进行实验,则要把localhost改成客户端的IP地址
                            channel.send(buffer, new InetSocketAddress("192.168.1.5", 8888));
                        }


                        channel.close();
                    }
                }
                System.out.println("client end !");
            }

        }



结果:
在这里插入图片描述
服务端显示收到的数据:
在这里插入图片描述

将通道加入组播地址

注意,首先在Linux中使用命令 systemctl stop firewalld.service
关闭防火墙,然后屏蔽服务端上多余的网卡。 MembershipKey join(InetAddress group,NetworkInterface interf)方法的作用是将通道加入到组播地址中。
创建测试用的代码,本类需要运行在计算机A中

ublic class Test1_5 {
    
    
public static void main(String[] args) throws IOException, InterruptedException {
    
    
    DatagramChannel channel = DatagramChannel.open(StandardProtocolFamily.INET);
    channel.join(InetAddress.getByName("224.0.0.5"),
            NetworkInterface.getByInetAddress(InetAddress.getByName("192.168.0.150")));
    // 必须执行bind操作,不然客户端发送数据本类接收不到
    channel.bind(new InetSocketAddress("192.168.0.150", 8088));
    channel.configureBlocking(false);
    Selector selector = Selector.open();
    SelectionKey selectionKey1 = channel.register(selector, SelectionKey.OP_READ);
    boolean isRun = true;
    while (isRun == true) {
    
    
        selector.select();
        Set<SelectionKey> selectedKeysSet = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectedKeysSet.iterator();
        while (iterator.hasNext()) {
    
    
            SelectionKey key = iterator.next();
            if (key.isReadable()) {
    
    
                channel = (DatagramChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1000);
                channel.receive(buffer);
                System.out.println(new String(buffer.array(), 0, buffer.position(), 
                "utf-8"));
            }
            iterator.remove();
        }
    }
    channel.close();
}

下面类需要运行在计算机B中

public class Test1_6 {
    
    
public static void main(String[] args) throws IOException, InterruptedException {
    
    
    DatagramChannel channel = DatagramChannel.open(StandardProtocolFamily.INET);
    channel.connect(new InetSocketAddress("224.0.0.5", 8088));
    channel.configureBlocking(false);
    Selector selector = Selector.open();
    SelectionKey selectionKey1 = channel.register(selector, SelectionKey.OP_WRITE);
    int keyCount = selector.select();
    Set<SelectionKey> selectedKeysSet = selector.selectedKeys();
    Iterator<SelectionKey> iterator = selectedKeysSet.iterator();
    while (iterator.hasNext()) {
    
    
        SelectionKey key = iterator.next();
        if (key.isWritable()) {
    
    
            ByteBuffer buffer = ByteBuffer.wrap("我来自客户端!".getBytes());
            channel.write(buffer);
            channel.close();
        }}
    System.out.println("client end !");
}
}

猜你喜欢

转载自blog.csdn.net/qq_41358574/article/details/120614970