创建一个topic
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic sparkafka \
--zookeeper node01:2181,node02:2181,node03:2181
1、Kafka0.8版本:接受数据的两种方式
第一种:ReveiveDstream(重复消费)
使用HighLevelAPI进行消费,offset保存至zookeeper里面,使用at least once 消费模式,会造成数据的重复消费
原因:每隔一段时间自动提交offset,数据消费完但是offset没提交上
代码:
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.immutable
object spark_kafka {
def updateFunc(input :Seq[Int], resultPut:Option[Int]):Option[Int] = {
val result = input.sum + resultPut.getOrElse(0)
Option(result)
}
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("sparkKafka"))
val streaming = new StreamingContext(context, Seconds(5))
context.setLogLevel("WARN")
context.setCheckpointDir("./sheck_point")
val zkQuorum = "node01:2181,node02:2181,node03:2181"
val groupId = "Receiver"
val topics = Map("sparkafka" -> 3)
//吧三个分区的数据封装在IndexedSeq里面
val receiverDstream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] =
//启用三个线程去读取topic三个分区的数据
(1 to 3).map(x => {
val stream = KafkaUtils.createStream(streaming, zkQuorum, groupId, topics)
stream
})
/**
* String, String
* 第一位是key
* 第二位是value(需要的)
*/
//将三个分区的数据合并
val union: DStream[(String, String)] = streaming.union(receiverDstream)
//取出value
val line = union.map((_._2))
val value = line.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc)
value.print()
streaming.start()
streaming.awaitTermination()
}
}
第二种:DirectDstream (数据丢失)
使用LowLevelAPI进行消费,offset报存在 Kafka自带的topic里面,使用at most once消费模式,会造成数据的丢死
原因:默认按照最新的offset消费
代码:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils
object spark_kafka02 {
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext(new SparkConf().setAppName("sparkKafka").setMaster("local[4]"))
sparkContext.setLogLevel("WARN")
val streamingContext = new StreamingContext(sparkContext, Seconds(5))
/**
* K: ClassTag,
* V: ClassTag,
* KD <: Decoder[K]: ClassTag,
* VD <: Decoder[V]: ClassTag] (
* ssc: StreamingContext,
* kafkaParams: Map[String, String],
* topics: Set[String]
*/
//4、配置kafka相关参数
val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "Kafka_Direct")
val topicDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, Set("sparkafka"))
//得到value
val resultLine = topicDStream.map(_._2)
val result = resultLine.flatMap(_.split(" ")).map((_,1))
result.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
2、Kafka0.10版本
只有一种接受数据的方式(DirectDstream )
DirectDstream :使用LowLevelAPI进行消费,offset默认保存在自带的topic里面,配合手动提交offset,实现exactly once的消费模式
原因:数据消费完之后手动提交,不会重复消费和数据丢失
代码
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010._
object sparkKafka {
def main(args: Array[String]): Unit = {
/**
* ssc: StreamingContext,
* locationStrategy: LocationStrategy,
* consumerStrategy: ConsumerStrategy[K, V]
*/
val sparkContext = new SparkContext(new SparkConf().setAppName("sparkKafka").setMaster("local[4]"))
sparkContext.setLogLevel("WARN")
val streamingContext = new StreamingContext(sparkContext, Seconds(3))
val consistent = LocationStrategies.PreferConsistent
/** consumerStrategy
* [K, V](
* topics: ju.Collection[jl.String],
* kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V]
*/
//创建topic
val brokers = "node01:9092,node02:9092,node03:9092"
//创建消费者组
var group = "sparkafkaGroup"
//消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> brokers, //用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> group,
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val consumerStrategies = ConsumerStrategies.Subscribe[String, String](Array("sparkafka"), kafkaParam)
val resultDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](streamingContext, consistent, consumerStrategies)
//循环遍历RDD里面的数据
//一个RDD有多个分区,一份分区有多条数据
resultDStream.foreachRDD(iter => {
//如果RDD里面的数据大于0
if (iter.count() > 0) {
//得到每一行的数据,将数据进行处理
iter.foreach(line => {
val value = line.value()
print(value)
})
//手动提交offset
//获取RDD中所有的offset,将iter强转成HasOffsetRanges,得到多有offset值
val ranges = iter.asInstanceOf[HasOffsetRanges].offsetRanges
//提交offset值
resultDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
}