记录一下Android作为客户端去连接ActiveMQ服务时,存在的问题。
MQTT
mqtt比较简单,有较为成熟的库MqttAndroidClient;
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}
dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
别忘记配置manifest:
<service android:name="org.eclipse.paho.android.service.MqttService"/>
大致写法一目了然:
MqttAndroidClient mqClient = new MqttAndroidClient(context, TextFormmater.getAddress(config.serverUrl), TextUtils.isEmpty(config.clientId) ? TextFormmater.getClientId(AppMonitor.MAC) : config.clientId);
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setAutomaticReconnect(true);
connectOptions.setCleanSession(true);
connectOptions.setConnectionTimeout(10);
connectOptions.setKeepAliveInterval(30);
connectOptions.setUserName(username);
connectOptions.setPassword(password.toCharArray());
try {
//后续监听
mqClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
if (cause != null) {
cause.printStackTrace();
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
//接收消息
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//发送、提交完成
}
});
//连接
mqClient.connect(connectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
L.w("连接成功," + asyncActionToken.getClient().getServerURI());
//订阅主题
mqClient.subscribe(topic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
}
});
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
}
});
} catch (MqttException e) {
e.printStackTrace();
}
OpenWire
OpenWire比较麻烦,虽然也有成熟的库ActiveMQ,但一般用于J2EE服务端。
当然我们理论上是可以使用其activemq-client的包,就当成普通的JMS来使用:
implementation 'org.apache.activemq:activemq-client:5.16.3'
远程仓库导入的好处可以让其自行下载本身的依赖,否则本地导入是需要在activemq里找出好几个jar包来补足的。
当然事情没那么顺利。
编译运行会报错:
错误: 无法访问Referenceable
找不到javax.naming.Referenceable的类文件
因为移动端使用是Java SE的JDK,有所缺失,所以添加需要JNDI的支持,但这个库基本找不到正常的远程仓库,只能在一些不常见的网站中找到:
http://www.java2s.com/Code/Jar/j/Downloadjndi121jar.htm
下载下来,加入依赖,至少能运行起来了。
然后在创建连接时又会遇到下面这个错误:
java.lang.NoClassDefFoundError: org.apache.activemq.TransactionContext
缺少的是这个TransactionContext的接口XAResource:
public class TransactionContext implements XAResource
同样是SE版没有的东西,所以继续找补;
好在这个在jta中就能找到,可以远程导入:
implementation 'javax.transaction:jta:1.1'
这回总算找齐了,看点服务端使用JMS的案例,就可以直接操作了:
OpenWireClient client = new OpenWireClient.Builder()
.url("tcp://192.168.21.104:61616")
.username("xxx")
.password("dddddd")
.clientId("desk")
.build();
//在连接前设定回调
client.setCallback(new OpenWireClient.OpenWireCallback() {
@Override
public void subscribeFinish(String topic) {
System.out.println("订阅完成, " + topic);
}
@Override
public void connectionCreated(BrokerInfo brokerInfo) {
System.out.println("连接成功, " + brokerInfo.toString());
try {
client.subscribe("xter", "xter", false);
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void messageArrived(String topic, Message message) throws Exception {
if (message instanceof TextMessage) {
//文字消息接收
TextMessage textMessage = (TextMessage) message;
//接收消息
String msg = textMessage.getText();
System.out.println("收到[" + topic + "]的消息, " + msg);
} else if (message instanceof BytesMessage) {
//字节消息接收
BytesMessage bytesMessage = (BytesMessage) message;
long len = bytesMessage.getBodyLength();
byte[] bytes = new byte[(int) len];
bytesMessage.readBytes(bytes);
String msg = new String(bytes);
System.out.println("收到[" + topic + "]的消息, " + msg);
}
}
@Override
public void onException(Exception ex) {
ex.printStackTrace();
}
});
try {
client.connect();
TimeUnit.SECONDS.sleep(20);
client.disconnect();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
OpenWireClient是依照MqttAndroidClient写的一个简单封装,源码在这里。