Android端连接ActiveMQ服务器

记录一下Android作为客户端去连接ActiveMQ服务时,存在的问题。

MQTT

mqtt比较简单,有较为成熟的库MqttAndroidClient

repositories {
    maven {
        url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
    }
}


dependencies {
    compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

别忘记配置manifest:

        <service android:name="org.eclipse.paho.android.service.MqttService"/>

大致写法一目了然:

MqttAndroidClient mqClient = new MqttAndroidClient(context, TextFormmater.getAddress(config.serverUrl), TextUtils.isEmpty(config.clientId) ? TextFormmater.getClientId(AppMonitor.MAC) : config.clientId);
		MqttConnectOptions connectOptions = new MqttConnectOptions();
		connectOptions.setAutomaticReconnect(true);
		connectOptions.setCleanSession(true);
		connectOptions.setConnectionTimeout(10);
		connectOptions.setKeepAliveInterval(30);
		connectOptions.setUserName(username);
		connectOptions.setPassword(password.toCharArray());
		
		try {
    
    
			//后续监听
			mqClient.setCallback(new MqttCallback() {
    
    
				@Override
				public void connectionLost(Throwable cause) {
    
    
					if (cause != null) {
    
    
						cause.printStackTrace();
					}
				}

				@Override
				public void messageArrived(String topic, MqttMessage message) throws Exception {
    
    
					//接收消息
				}

				@Override
				public void deliveryComplete(IMqttDeliveryToken token) {
    
    
					//发送、提交完成
				}
			});
			//连接
			mqClient.connect(connectOptions, null, new IMqttActionListener() {
    
    
				@Override
				public void onSuccess(IMqttToken asyncActionToken) {
    
    
					L.w("连接成功," + asyncActionToken.getClient().getServerURI());
					//订阅主题
					mqClient.subscribe(topic, 0, null, new IMqttActionListener() {
    
    
					@Override
					public void onSuccess(IMqttToken asyncActionToken) {
    
    

					}

					@Override
					public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    
    
					}
				});
				}

				@Override
				public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    
    
					exception.printStackTrace();
				}
			});
		} catch (MqttException e) {
    
    
			e.printStackTrace();

		}

OpenWire

OpenWire比较麻烦,虽然也有成熟的库ActiveMQ,但一般用于J2EE服务端。
当然我们理论上是可以使用其activemq-client的包,就当成普通的JMS来使用:

implementation 'org.apache.activemq:activemq-client:5.16.3'

远程仓库导入的好处可以让其自行下载本身的依赖,否则本地导入是需要在activemq里找出好几个jar包来补足的。
当然事情没那么顺利。

编译运行会报错

错误: 无法访问Referenceable
找不到javax.naming.Referenceable的类文件

因为移动端使用是Java SE的JDK,有所缺失,所以添加需要JNDI的支持,但这个库基本找不到正常的远程仓库,只能在一些不常见的网站中找到:

http://www.java2s.com/Code/Jar/j/Downloadjndi121jar.htm
下载下来,加入依赖,至少能运行起来了。

然后在创建连接时又会遇到下面这个错误

 java.lang.NoClassDefFoundError: org.apache.activemq.TransactionContext

缺少的是这个TransactionContext的接口XAResource:

public class TransactionContext implements XAResource

同样是SE版没有的东西,所以继续找补;
好在这个在jta中就能找到,可以远程导入:

    implementation 'javax.transaction:jta:1.1'

这回总算找齐了,看点服务端使用JMS的案例,就可以直接操作了:

OpenWireClient client = new OpenWireClient.Builder()
				.url("tcp://192.168.21.104:61616")
				.username("xxx")
				.password("dddddd")
				.clientId("desk")
				.build();
		//在连接前设定回调
		client.setCallback(new OpenWireClient.OpenWireCallback() {
    
    
			@Override
			public void subscribeFinish(String topic) {
    
    
				System.out.println("订阅完成, " + topic);
			}

			@Override
			public void connectionCreated(BrokerInfo brokerInfo) {
    
    
				System.out.println("连接成功, " + brokerInfo.toString());
				try {
    
    
					client.subscribe("xter", "xter", false);
				} catch (JMSException e) {
    
    
					e.printStackTrace();
				}
			}

			@Override
			public void messageArrived(String topic, Message message) throws Exception {
    
    
				if (message instanceof TextMessage) {
    
    
					//文字消息接收
					TextMessage textMessage = (TextMessage) message;
					//接收消息
					String msg = textMessage.getText();
					System.out.println("收到[" + topic + "]的消息, " + msg);
				} else if (message instanceof BytesMessage) {
    
    
					//字节消息接收
					BytesMessage bytesMessage = (BytesMessage) message;
					long len = bytesMessage.getBodyLength();
					byte[] bytes = new byte[(int) len];
					bytesMessage.readBytes(bytes);
					String msg = new String(bytes);
					System.out.println("收到[" + topic + "]的消息, " + msg);
				}
			}

			@Override
			public void onException(Exception ex) {
    
    
				ex.printStackTrace();
			}
		});
		try {
    
    
			client.connect();
			TimeUnit.SECONDS.sleep(20);
			client.disconnect();
		} catch (JMSException | InterruptedException e) {
    
    
			e.printStackTrace();
		}

OpenWireClient是依照MqttAndroidClient写的一个简单封装,源码在这里

猜你喜欢

转载自blog.csdn.net/ifmylove2011/article/details/122358973