发现服务器
DiscoverServer
package com.gbcom.ccsv3.transport.multidiscover; import org.apache.log4j.Logger; /** * 发现服务器 * * <p> * * @author syz * <p> * @date 2015-6-26,下午04:23:36 * <p> * @version v1.0.0 * <p> * @see com.gbcom.ccsv3.transport.multidiscover.DiscoverServer */ public class DiscoverServer { private static final Logger LOGGER = Logger.getLogger(DiscoverServer.class); private static class DiscoverServerHolder{ private static final DiscoverServer INSTANCE = new DiscoverServer(); } /** * 获取单例 * * @return DiscoverServer */ public static DiscoverServer getInstance() { return DiscoverServerHolder.INSTANCE; } private boolean started = false; private Receiver discover; private DiscoverServer() { discover = UdpDiscoverFactory.getMultiUdpDiscover(); } /** * 开 */ public void on() { started = true; Thread t = new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub try { discover.start(); } catch (Exception e) { e.printStackTrace(); discover = null; LOGGER.error("start discover server unknoe host", e); started = false; // maybe throws new exception. } } }); t.start(); LOGGER.info("start discover server for device success!!!!"); } /** * 关 */ public void off() { if (discover != null) { discover.stop(); } started = false; } /** * 是否开启 * * @return started */ public boolean isStarted() { return started; } static class UdpDiscoverFactory { /** * 获取多播发现者 * * @return Receiver */ public static Receiver getMultiUdpDiscover() { return new MultiReceiver(); } /** * 获取单播发现者 * * @return Receiver */ public static Receiver getUdpDiscover() { return new UniReceiver(); } } }
接受者
接口
package com.gbcom.ccsv3.transport.multidiscover; import java.net.UnknownHostException; /** * UDP 发现 * * <p> * * @author syz * <p> * @date 2015-6-26,下午04:39:06 * <p> * @version v1.0.0 * <p> * @see com.gbcom.ccsv3.transport.multidiscover.Receiver */ public interface Receiver { /** * 开始 * * @throws UnknownHostException * Exception */ public void start() throws UnknownHostException; /** * 停止 */ public void stop(); /** * 是否开启 * * @return Boolean */ public boolean isStarted(); }
组播的实现
package com.gbcom.ccsv3.transport.multidiscover; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; import java.net.UnknownHostException; import java.util.Date; import org.apache.log4j.Logger; /** * 多播发现者 * * * 组播 通信端口 1107 单播 通信端口1108 * * <p> * * @author syz * <p> * @date 2015-6-26,下午04:25:33 * <p> * @version v1.0.0 * <p> * @see com.gbcom.ccsv3.transport.multidiscover.MultiReceiver */ public class MultiReceiver implements Receiver { private static final Logger LOGGER = Logger.getLogger(MultiReceiver.class); /** * 多播ip */ public static final String MULTI_GROUP_IP = "224.7.11.3"; /** * 多播端口 */ public static final int MULTI_GROUP_PORT = 1107; private MulticastSocket msr = null; private InetAddress group = null; private boolean started = false; /** * 开始 * * @throws UnknownHostException * Exception */ @Override public void start() throws UnknownHostException { // 创建多播socket // 接收报文 this.group = InetAddress.getByName(MULTI_GROUP_IP);// 组播地址 try { msr = new MulticastSocket(MULTI_GROUP_PORT); // server bind port //java.net.SocketException: No such device // at java.net.PlainDatagramSocketImpl.join(Native Method) // at java.net.PlainDatagramSocketImpl.join(PlainDatagramSocketImpl.java:181) // at java.net.MulticastSocket.joinGroup(MulticastSocket.java:277) // at com.gbcom.ccsv3.transport.multidiscover.MultiReceiver.start(MultiReceiver.java:56) // at com.gbcom.ccsv3.transport.multidiscover.DiscoverServer$1.run(DiscoverServer.java:50) // at java.lang.Thread.run(Thread.java:662) msr.joinGroup(group);// 加入连接 byte[] buffer = new byte[50]; LOGGER.info("Thread=" + Thread.currentThread() + " ; MultiReceiver started!!! (启动时间: " + new Date() + ")"); started = true; while (true) { try { // 建立一个指定缓冲区大小的数据包 DatagramPacket dp = new DatagramPacket(buffer, buffer.length); msr.receive(dp); DpDispatcher.getInstance().addDp(dp); } catch (Exception e) { LOGGER.error("receiver is error , continue", e); continue; } } } catch (Exception e) { e.printStackTrace(); LOGGER.error("MultiDiscover --- start -- error", e); } finally { if (msr != null) { try { msr.leaveGroup(group); msr.close(); } catch (Exception e) { LOGGER.error("MultiDiscover --- start finall -- error", e); } } } } /** * 停止 */ @Override public void stop() { if (msr != null) { try { msr.leaveGroup(group); msr.close(); } catch (Exception e) { LOGGER.error("MultiDiscover --- start finall -- error", e); } } started = false; } /** * 是否开启 * * @return started */ @Override public boolean isStarted() { return started; } }
发送者
package com.gbcom.ccsv3.transport.multidiscover; import java.net.InetAddress; import java.net.UnknownHostException; /** * UDP 发现 * * <p> * * @author syz * <p> * @date 2015-6-26,下午04:39:06 * <p> * @version v1.0.0 * <p> * @see Sender */ public interface Sender { /** * 发送消息。 添加ip 和port 兼容单播处理,作为单播的 端口和地址, * * @param msg * String * @param ip * InetAddress * @param port * int * @throws UnknownHostException * Exception */ public void send(String msg, InetAddress ip, int port) throws UnknownHostException; }
组播实现
package com.gbcom.ccsv3.transport.multidiscover; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; import java.net.UnknownHostException; import org.apache.log4j.Logger; /** * 多播发送者,单利 和 静态类 * 组播 通信端口 1107 单播 通信端口1108 * * <p> * * @author syz * <p> * @date 2015-6-26,下午04:25:33 * <p> * @version v1.0.0 * <p> * @see MultiSender */ public final class MultiSender implements Sender { private static final Logger LOGGER = Logger.getLogger(MultiSender.class); /** * 多播ip */ public static final String MULTI_GROUP_IP = "224.7.11.3"; /** *多播端口 */ public static final int MULTI_GROUP_PORT = 1108; private static final MultiSender INSTANCE = new MultiSender(); /** * 单例模式,获取单例 * * @return MultiSender */ public static MultiSender getInstance() { return INSTANCE; } private MultiSender() { } /** * @param msg * String * @param ip * InetAddress * @param port * int * @throws UnknownHostException * Exception */ @Override public void send(String msg, InetAddress ip, int port) throws UnknownHostException { InetAddress group = InetAddress.getByName(MULTI_GROUP_IP);// 组播地址 MulticastSocket mss = null; try { // mss = new MulticastSocket(MULTI_GROUP_PORT); mss = new MulticastSocket(); // 随机端口 client mss.joinGroup(group); byte[] buffer = msg.getBytes(); DatagramPacket dp = new DatagramPacket(buffer, buffer.length, group, MULTI_GROUP_PORT); mss.send(dp); } catch (Exception e) { e.printStackTrace(); LOGGER.error("MultiSender -- send", e); } finally { try { if (mss != null) { mss.leaveGroup(group); mss.close(); } } catch (Exception e) { LOGGER.error("MultiSender -- send -- final", e); } } } }
处理器实现
package com.gbcom.ccsv3.transport.multidiscover; import java.net.DatagramPacket; import java.net.InetAddress; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import com.gbcom.omc.si.common.Const; /** * 转发器,接收dp * * <p> * * @author syz * <p> * @date 2015-6-26,下午04:29:42 * <p> * @version v1.0.0 * <p> * @see com.gbcom.ccsv3.transport.multidiscover.DpDispatcher */ public class DpDispatcher { private static final Logger LOG = Logger.getLogger(DpDispatcher.class); private static final int THREAD_NUM = 1; private static final int BLOCK_QUEUE_MAX_SIZE = 10000; private static final int BLOCK_QUEUE_CLEAR_SIZE = 10000; /** * 线程的执行器 */ private ExecutorService executor = null; private boolean isRunning = false; /** * 上报Trap消息的队列 :SIZE */ private BlockingQueue<DatagramPacket> dpQueue = new LinkedBlockingQueue<DatagramPacket>( BLOCK_QUEUE_MAX_SIZE); private static class DpDispatcherHolder { private static final DpDispatcher INSTANCE = new DpDispatcher(); } /** * 获取单例对象 * * @return TaskDispatcher */ public static DpDispatcher getInstance() { return DpDispatcherHolder.INSTANCE; } private DpDispatcher() { init(); start(); } private void init() { isRunning = false; } /** * 添加数据包 * * @param dp * DatagramPacket */ public void addDp(DatagramPacket dp) { if (!isRunning) { LOG .error("UdpDispatcher is not running, the Task below may not process"); } if (LOG.isDebugEnabled()) { LOG.debug("add DatagramPacket to Queue,, Address=" + dp.getAddress()); } try { if (dpQueue.size() >= BLOCK_QUEUE_CLEAR_SIZE) { LOG .info(" *****cleart request Task***** trap queue size is more than " + BLOCK_QUEUE_CLEAR_SIZE + ";; CLEAR BlockingQueue"); dpQueue.clear(); } dpQueue.put(dp); } catch (InterruptedException e) { LOG.info("/******* add dp InterruptedException*********/"); LOG.error("add dp to queue interrupted", e); LOG.info("/******* add dp InterruptedException *********/"); } catch (Exception e) { LOG.error("Other Exception ", e); } } /** * 停止 */ public void stop() { executor.shutdownNow(); isRunning = false; } /** * 开始 */ public void start() { executor = Executors.newCachedThreadPool(); for (int i = 0; i < THREAD_NUM; i++) { executor.execute(new DispatcherTask()); } isRunning = true; LOG.info("do Dispatcher task start , current thread size = " + THREAD_NUM); } class DispatcherTask implements Runnable { /** * 线程执行方法 */ @Override public void run() { DatagramPacket dp = null; while (!Thread.currentThread().isInterrupted()) { try { long begin = System.currentTimeMillis(); dp = dpQueue.take(); String s = new String(dp.getData(), 0, dp.getLength()); LOG.info("discover receiver dp , msg=" + s + ",dpQueue size=" + dpQueue.size()); if (s.equalsIgnoreCase("who")) { /* * TransportMapping mapping * =SnmpSingleton.getTransportMapping(); if(mapping * instanceof DefaultUdpTransportMapping){ String ip = * ((DefaultUdpTransportMapping)mapping).getAddress(). * getInetAddress().toString(); * SenderFactory.getMultiSender().send(ip); } */ String ip = "NULL"; int port = 162; if (Const.sourceSnmpIp == null || Const.sourceSnmpIp.trim().equals("")) { ip = InetAddress.getLocalHost().getHostAddress() .toString(); } else { String[] udpSrc = (Const.sourceSnmpIp.trim()) .split("/"); if (udpSrc.length < 1 || udpSrc.length > 2) { ip = InetAddress.getLocalHost() .getHostAddress().toString(); } else { ip = udpSrc[0]; port = (udpSrc.length == 2) ? Integer .parseInt(udpSrc[1]) : 162; } } String msg = "IP:" + ip + "," + "PORT:" + port; // InetAddress addr = // InetAddress.getByName(MultiSender.MULTI_GROUP_IP); // SenderFactory.getMultiSender().send(msg,MultiSender.MULTI_GROUP_IP,MultiSender.MULTI_GROUP_PORT); SenderFactory.getUniSender().send(msg, dp.getAddress(), dp.getPort()); } else { // LOG.error("OTHER INFOR---"+s); } if (LOG.isDebugEnabled()) { LOG.info("process Task success, thread=" + Thread.currentThread().getName() + " ;spend time :total= " + ((System.currentTimeMillis() - begin) / 1000) + "s || the queue size is not actually:" + dpQueue.size()); } } catch (InterruptedException e) { LOG .info("/******* DP Dispatcher InterruptedException*********/"); LOG.error("DP Dispatcher thread interrupted ;; tread = " + Thread.currentThread().getName(), e); LOG .info("/******* DP Dispatcher InterruptedException*********/"); Thread.currentThread().interrupt(); break; } catch (Exception e) { LOG.error("DP Dispatcher thread exception", e); continue; } } } } public static class SenderFactory { /** * 获取多播发送者 * * @return Sender */ public static Sender getMultiSender() { return MultiSender.getInstance(); } /** * 获取单播发送者 * * @return UniSender */ public static Sender getUniSender() { return UniSender.getInstance(); } } }
使用 打开发现服务器,,扩展处理器模块即可。
示例代码仅支持udp,完整代码参考附件。