在使用paho的包批量发送mqtt消息(QOS为0)时,跑了一定的时间后就会报错Too many publishes in progress (32202)。
所使用包
<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
报错地方
if (message instanceof MqttPublish) {
synchronized (queueLock) {
if (actualInFlight >= this.maxInflight) {
//@TRACE 613= sending {0} msgs at max inflight window
log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});
throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}
......
当检查到actualInFlight大于maxInflight时,会报Too many publishes in progress (32202)
尝试将maxInflight值设大,但依旧会报错。
观察源码的发布流程
- 当调用MqttClient的publish方法时,并没有真正的发布消息,而是检查是否可以发布消息(actualInFlight小于maxInflight),如果可以则将消息放进pendingMessages。
if (message instanceof MqttPublish) {
synchronized (queueLock) {
if (actualInFlight >= this.maxInflight) {
throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}
MqttMessage innerMessage = ((MqttPublish) message).getMessage();
······
tokenStore.saveToken(token, message);
pendingMessages.addElement(message);
queueLock.notifyAll();
······
实际上是谁在处理pendingMessages队列里的消息呢
- 跟踪MqttClient的connect方法,实际会启动一个ConnectBG
ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
conbg.start();
- 观察ConnectBG的run方法
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId(), executorService);
发现会启用一个单独的线程去消费pendingMessages里的消息,发送到MQTT服务器
public void run() {
.....
message = clientState.get();
if (message != null) {
.....
MqttToken token = tokenStore.getToken(message);
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
if (!(message instanceof MqttDisconnect)) {
throw ex;
}
}
clientState.notifySent(message);
}
}
.....
}
clientState.get()中会从pendingMessages中取一个消息,并使actualInFlight+1
在发送完消息后,clientState.notifySent(message)会判断如果QOS为0,则actualInFlight-1,不需要等待服务器回复。
以并发发送QOS为0的消息场景来说,在Sender线程中,会先从队列里取消息,actualInFlight+1,发送,然后actualInFlight-1,并且Sender线程是单线程的,理论上actualInFlight应该永远为0,不可能会超过maxInflight的
开启日志调试
实际上paho的源码里是有很多日志输出,于是想打开日志看看问题出在哪里
日志配置方法:https://blog.csdn.net/lblblblblzdx/article/details/81136922
在日志中搜索actualInFlight
发现actualInFlight会经历多次+1,才会经历一次-1观察actualInFlight+1和下一次actualInFlight+1之间经历了什么
当sender从pendingMessages取出消息后,actualInFlight+1,message不为空,打印key=0和message
紧接着,sender又从pendingMessages取出消息后,actualInFlight+1
初步判断,sender获取消息后因为某些原因报错了或者某些判断条件没有执行下去,导致没有了后面应有的日志。观察actualInFlight+1和下一次actualInFlight-1之间经历了什么
当sender从pendingMessages取出消息后,actualInFlight+1,message不为空,打印key=0和message
中间夹杂着许多次MqttClient的publish方法调用,然后是send,然后是saveToken
紧接着sender使actualInFlight-1,然后是removeToken从日志或者代码都可以知道QOS=0时key都为0,而tokens是存在一个map里,并发发送QOS=0的消息时,多次saveToken其实都只成功往tokens里放进一个值,但只要sender发送一次消息,就会removeToken,导致发送第二条QOS为0的消息时,获取不到token,直接放弃发送。
saveToken方法
protected void saveToken(MqttToken token, String key) {
final String methodName = "saveToken";
synchronized(tokens) {
//@TRACE 307=key={0} token={1}
log.fine(CLASS_NAME,methodName,"307",new Object[]{key,token.toString()});
token.internalTok.setKey(key);
this.tokens.put(key, token);
}
}
Sender的发送逻辑
MqttToken token = tokenStore.getToken(message);
// Whilequiescing the tokenstore can be cleared so need
// to check for null for the case where clear occurs
// while trying to send a message.
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
// The flush has been seen to fail on disconnect of a SSL socket
// as disconnect is in progress this should not be treated as an error
if (!(message instanceof MqttDisconnect)) {
throw ex;
}
}
clientState.notifySent(message);
}
}
结论
从以上过程中可得出结论,并发发送QOS=0的消息时,多次将消息加到pendingMessages队列并saveToken后,假若此时Sender线程获取到CPU资源,从pendingMessages队列获取消息并removeToken后,就会导致pendingMessages队列里其余消息获取不到token,导致发送失败,最终导致actualInFlight越来越大,程序报错Too many publishes in progress (32202)
解决方案
使用QOS=1