spark 生产线上碰到的问题

spark 生产线上碰到的问题

1、第一张错误信息图片
这里写图片描述
2、第二张错误信息图片
这里写图片描述
3、第三张错误信息图片
这里写图片描述
4、第四张sparkUI上显示的信息
这里写图片描述
一、产生事故的背景:
  上线一个疲劳驾驶事件处理的流式分析,由于中间业务处理的代码逻辑有变更,导致需要重新部署。每次都是将checkpoint中的元数据删掉,因为已经过了比较长的时间了,而且又用到updateStateByKey算子,担心数据算的不准所以删掉。每次重新部署上线的时候,第一个batch就处理时间特别长,然后接着就报上面的错误,然后导致excutor进程退出,最后spark程序挂掉。
二、分析事故产生的原因
  通过第四张图中看到每个task的数据比我理论估计的值多了很多,我设置了spark.streaming.kafka.maxRatePerPartition参数来控制,理论上是这个配置值*topic分区数*batch的时间。但是看到的比这个多了很多,目前还没看源码的能力,后面再看,估计是消费了从之前spark程序挂掉的结束时间到新部署spark程序的开始时间的数据,数据量一下子比预估多了很多,这就导致程序跑的非常慢,GC时间很长。同时我的业务处理中有对checkpoint中hdfs操作,导致有找不到(第三张图片)checkpoint中hdfs文件的错误(这个问题无法细节描述),至于(第二张图)FatchFailedException和集群上的资源不够也有关系。
三、最后解决的办法
  将checkpoint数据删除,换一个新的groupId,就可以正常消费了,正常groupId是不允许换的,由于我们公司业务这块没限制,就这样简单处理了。后面我查了一些资料有更好的办法!
  (1): spark.streaming.backpressure.enabled 默认是false,设置为true,就开启了背压机制。
  (2):spark.streaming.backpressure.initialRate默认是不限制,用来限制第一次批处理应该消费的数据,因为程序冷启动 队列里面有大量积压,防止第一次全部读取,造成系统阻塞。
  (3): spark.streaming.receiver.maxRate默认值没设置。每个接收器将接收数据的最大速率(每秒记录数)。 实际上,每个流每秒最多将消费此数量的记录。 将此配置设置为0或负数将不会对速率进行限制。
  备注:我用的receiver方式去消费kafka数据,所以才需要配置第三个,用Direct方式消费kafka数据配置前两个,加spark.streaming.kafka.maxRatePerPartition参数。
  最后:聊聊receiver模式和direct模式比较:
  receiver模式:通过配置线程数去消费kafka多个分区,partation和kafka的分区数无关,需要WAL(预写日志),这样一来性能会下降很多,而且数据多了一份在receiver那,offset这块会自动提交到zookeeper上去。
  direct模式:针对kafka的多个分区进行消费,一个分区对应spark一个partition,offset由spark streaming自己维护,可以写入到checkpoint中去,消费一次且仅一次,不需要WAL,性能这块提升很多,数据恢复可以从kafka的备份数据中进行恢复,不用太多担心。但是自己维护的offset默认没有提交到zookeeper上去,这样kafka监控看不到这个groupId的消费情况的。还有就是kafka的分区不会太多,所以自己得先将数据扩展到多个分区中进行处理,后续再减少分区做落地实现。一个分区对应一个task,task太少,数据量大,就会导致数据处理的延迟,这块需要注意。
  总体来说用direct模式消费kafka数据要比receiver模式要好,建议用direct模式。

猜你喜欢

转载自blog.csdn.net/qq_38019655/article/details/82530783