使用IBM MQTTv3实现相关的发布订阅功能
MQTTv3的发布消息的实现:
package com.etrip.mqttv3; import com.ibm.micro.client.mqttv3.MqttClient; import com.ibm.micro.client.mqttv3.MqttDeliveryToken; import com.ibm.micro.client.mqttv3.MqttMessage; import com.ibm.micro.client.mqttv3.MqttTopic; /** * MQTTV3的发布消息类 * * @author longgangbai */ public class MQTTPub { public static void doTest(){ try { MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub"); MqttTopic topic = client.getTopic("tokudu/china"); MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes()); message.setQos(1); client.connect(); while(true){ MqttDeliveryToken token = topic.publish(message); while (!token.isComplete()){ token.waitForCompletion(1000); } } } catch (Exception e) { e.printStackTrace(); } } }
MQTTV3的订阅消息类
package com.etrip.mqttv3; import com.ibm.micro.client.mqttv3.MqttClient; import com.ibm.micro.client.mqttv3.MqttConnectOptions; /** * MQTTV3的订阅消息类 * * @author longgangbai */ public class MQTTSubsribe { public static String doTest() { try { //创建MqttClient MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000"); //回调处理类 CallBack callback = new CallBack(); client.setCallback(callback); //创建连接可选项信息 MqttConnectOptions conOptions = new MqttConnectOptions(); // conOptions.setCleanSession(false); //连接broker client.connect(conOptions); //发布相关的订阅 client.subscribe("tokudu/china", 1); //client.disconnect(); } catch (Exception e) { e.printStackTrace(); return "failed"; } return "success"; } }
回调处理类处理订阅的消息类
package com.etrip.mqttv3; import com.ibm.micro.client.mqttv3.MqttCallback; import com.ibm.micro.client.mqttv3.MqttDeliveryToken; import com.ibm.micro.client.mqttv3.MqttMessage; import com.ibm.micro.client.mqttv3.MqttTopic; /** * 回调处理类 * 处理订阅的消息类 * * @author longgangbai */ public class CallBack implements MqttCallback { public CallBack() { } /** * 接收到信息的处理 */ public void messageArrived(MqttTopic topic, MqttMessage message) { try { System.out.println(" MQTTSubsribe message.toString()"+message.toString()); } catch (Exception e) { e.printStackTrace(); } } public void connectionLost(Throwable cause) { } public void deliveryComplete(MqttDeliveryToken token) { } }
测试类:
package com.etrip.mqttv3; /** * MQTTV3的测试类 * * @author longgangbai */ public class MQTTMain { public static void main(String[] args) { //订阅消息的方法 MQTTSubsribe.doTest(); //发布消息的类 MQTTPub.doTest(); } }