spark-streaming作为一个24*7不间断运行的程序来设计,但是程序都会crash,如果crash了,如何保证数据不丢失,不重复。
Input DStreams and Receivers
spark streaming提供了两种streaming input source:
- basic source: Source directly avaliable in the StreamingContext API. Examples: file,socket connnection
- advanced source: Source like kafka/kinesis, etc. are avaliable through extra utility classes.
本文只讨论高级数据源,因为针对流计算场景,基本数据源不适用。
高级数据源,这里以kafka为例,kafka作为输入源,有两种方式:
1. Receiver-based 方式
2. Direct 方式
两种方式的对比见博客:
保证数据不丢失(at-least)
spark RDD内部机制可以保证数据at-least语义。
Receiver方式
开启WAL(预写日志),将从kafka中接受到的数据写入到日志文件中,所有数据从失败中可恢复。
Direct方式
依靠checkpoint机制来保证。
保证数据不重复(exactly-once)
要保证数据不重复,即Exactly once语义。
- 幂等操作:重复执行不会产生问题,不需要做额外的工作即可保证数据不重复。
- 业务代码添加事务操作
dstream.foreachRDD {(rdd, time) =
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds,partitionId)
//use this uniqueId to transationally commit the data in partitionIterator
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
就是说针对每个partition的数据,产生一个uniqueId,只有这个partition的所有数据被完全消费,则算成功,否则算失效,要回滚。下次重复执行这个uniqueId时,如果已经被执行成功,则skip掉。