两种消费模式
一、基于Receiver的方式
Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复,但是效率底下,并且容易导致executor内存溢出,不推荐使用。
注意点:
1、Kafka中topic的partition,与Spark中的RDD的partition是没有关系的。所以,增加kafka中topic的分区数,只会增加receiver的个数,就是读取topic的线程数量,并不会增加spark处理数据的并行度。
2、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。
代码演示
val kafkaStream = {
val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
val kafkaParams = Map(
"zookeeper.connect" -> "zookeeper1:2181",
"group.id" -> "spark-streaming-test",
"zookeeper.connection.timeout.ms" -> "1000")
// topic名称
val topic_name = "input-topic"
// kafka的topic分区数
val num_of_topic_partitions = 5
// 启动与kafka相对应的分区数的receiver去接收数据
val streams = (1 to num_of_topic_partitions) map { _ =>
KafkaUtils.createStream(ssc, kafkaParams, Map(topic_name -> 1),StorageLevel.MEMORY_ONLY_SER).map(_._2)
}
// 将所有分区的数据联合
val unifiedStream = ssc.union(streams)
// 重分区(即spark并行度)
val sparkProcessingParallelism = 1
unifiedStream.repartition(sparkProcessingParallelism)
}
二、基于Direct的方式
Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
优点
1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
3、一次且仅一次的事务机制:
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
代码演示
// topic名称
val topics_name = Set("teststreaming")
// kafka地址
val brokers = "localhost1:9092,localhost2:9092,localhost3:9092"
// 连接参数
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics_name )
// 处理数据
val events = kafkaStream.flatMap(line => {
Some(line.toString())
})
三种消费语义
消费者注册到kafka的方式:
subscribe:这种方式在新增topic或者partition或者消费者增加或者消费者减少的时候,会进行消费者组内消费者的再平衡。
assign:这种方式注册的消费者不会进行rebalance。
1、At-most-once 最多消费一次,可能会导致数据丢失
(offset已经提交了,数据还没处理完,消费者挂了)
2、At-least-once 最少消费一次,可能会导致重复消费
(数据处理完了,offset还没提交之前,消费者挂了)
3、Exactly-once 恰好消费一次,保证数据不丢失不重复消费
(offset和数据同时处理完毕)
需要保证offset提交和数据存储在同一个事务里面,即存储到同一个库,例如Mysql等等。不同库难以实现该语义。
区别:
spark streaming基于receiver的方式都是实现了最少消费的语义,保证了数据不丢失,保证先输出结果,然后再提交offset到zk。
sparkstreaming direct api的方式,offset的控制就比较灵活,加上他去了receiver大大提升了效率,受到了广泛应用。但是缺点还是有的,那就是offset,需要手动维护,常见的方式是将offset提交到zk,然而这种方式由于输出结果和offset中间还是存在步骤差,会导致数据多次处理,结果多次输出,那么要求我们保证多次输出结果不影响我们的业务。