1.两种方式管理偏移量并将偏移量写入redis
(1)第一种:rdd的形式
一般是使用这种直连的方式,但其缺点是没法调用一些更加高级的api,如窗口操作。如果想更加精确的控制偏移量,就使用这种方式
代码如下
KafkaStreamingWordCountManageOffsetRddApi
package com._51doit.spark13 import com._51doit.utils.JedisConnectionPool import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.{Milliseconds, StreamingContext} object KafkaStreamingWordCountManageOffsetRddApi { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[*]") //创建StreamingContext,并指定批次生成的时间 val ssc = new StreamingContext(conf, Milliseconds(5000)) //设置日志级别 ssc.sparkContext.setLogLevel("WARN") //SparkStreaming 跟kafka进行整合 //1.导入跟Kafka整合的依赖 //2.跟kafka整合,创建直连的DStream【使用底层的消费API,效率更高】 val topics = Array("test11") //SparkSteaming跟kafka整合的参数 //kafka的消费者默认的参数就是每5秒钟自动提交偏移量到Kafka特殊的topic中: __consumer_offsets val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "feng05:9092,feng06:9092,feng07:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> "g013", "auto.offset.reset" -> "earliest" //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读 , "enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量 ) //跟Kafka进行整合,需要引入跟Kafka整合的依赖 //createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费 //直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) //指定订阅Topic的规则 ) kafkaDStream.foreachRDD(rdd => { //println(rdd + "-> partitions " + rdd.partitions.length) //判断当前批次的RDD是否有数据 if (!rdd.isEmpty()) { //将RDD转换成KafkaRDD,获取KafkaRDD每一个分区的偏移量【在Driver端】 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // //循环遍历每个分区的偏移量 // for (range <- offsetRanges) { // println(s"topic: ${range.topic}, partition: ${range.partition}, fromOffset : ${range.fromOffset} -> utilOffset: ${range.untilOffset}") // } //将获取到的偏移量写入到相应的存储系统呢【Kafka、Redis、MySQL】 //将偏移量写入到Kafka //对RDD进行处理 //Transformation 开始 val keys = rdd.map(_.key()) println(keys.collect().toBuffer) val lines: RDD[String] = rdd.map(_.value()) println(lines.collect().toBuffer) val words: RDD[String] = lines.flatMap(_.split(" ")) val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _) //Transformation 结束 //触发Action reduced.foreachPartition(it => { //在Executor端获取Redis连接 val jedis = JedisConnectionPool.getConnection jedis.select(3) //将分区对应的结果写入到Redis it.foreach(t => { jedis.hincrBy("wc_adv", t._1, t._2) }) //将连接还回连接池 jedis.close() }) //再更新这个批次每个分区的偏移量 //异步提交偏移量,将偏移量写入到Kafka特殊的topic中了 kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }) ssc.start() ssc.awaitTermination() } }
(2) 第二种:DStream的形式
功能更加丰富,可以使用DStream的api,但最终还是要调用foreachrdd,将数据写入redis
代码如下
KafkaStreamingWordCountManageOffsetDstreamApi
package com._51doit.spark13 import com._51doit.utils.JedisConnectionPool import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.{Milliseconds, StreamingContext} import redis.clients.jedis.Jedis object KafkaStreamingWordCountManageOffsetDstreamApi { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[*]") // 创建StreamingContext,并指定批次生成的时间 val ssc: StreamingContext = new StreamingContext(conf, Milliseconds(5000)) // 设置日志的级别 ssc.sparkContext.setLogLevel("WARN") // kafka整合SparkStreaming // 1.导入跟kafka整合的依赖 2. 跟kafka整合,创建直连的Dstream[使用底层的消费API,消费更高] val topics = Array("test11") // SparkStreaming跟kafka整合的参数 //kafka的消费者默认的参数就是每5秒钟自动提交偏移量到Kafka特殊的topic中: __consumer_offsets val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "feng05:9092,feng06:9092,feng07:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> "g014", "auto.offset.reset" -> "earliest" //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读 , "enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量 ) //直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, // 调度task到kafka所在的节点 ConsumerStrategies Subscribe[String, String](topics, kafkaParams) //消费者策略,指定订阅topic的规则 ) var offsetRanges: Array[OffsetRange] = null // 调用transform,取出kafkaRDD并获取每一个分区对应的偏移量 val transformDS: DStream[ConsumerRecord[String, String]] = kafkaDStream.transform(rdd => { // 在该函数中,获取偏移量 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }) // 调用DStream的API,其有一些RDD没有的API,如upsteateByKey, Window相关的操作 val reducedDStream: DStream[(String, Int)] = transformDS.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // 将数据写入redis,此时还是需要使用foreachRDD reducedDStream.foreachRDD(rdd => { if(!rdd.isEmpty()){ rdd.foreachPartition(it =>{ // 在Executor端获取Redis连接 c val jedis: Jedis = JedisConnectionPool.getConnection jedis.select(4) it.foreach(t=>{ jedis.hincrBy("wc_adv2",t._1, t._2) }) jedis.close() }) // 将计算完的批次对应的偏移量提交(在driver端移交偏移量) kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }) ssc.start() ssc.awaitTermination() } }
以上两种方式都无法保证数据只读取处理一次(即exactly once)。因为若是提交偏移量时出现网络问题,导致偏移量没有进行更新,但是数据却成功统计到redis中,这样就会反复读取某段数据进行统计
解决方法:使用事务,即数据的统计与偏移量的写入要同时成功,否则就回滚
2. MySQL事务的测试
MySQLTransactionTest
package cn._51doit.spark.day13 import java.sql.{Connection, DriverManager, PreparedStatement} /** * mysql的哪一种存储引擎支持事物呢? * InnoDB */ object MySQLTransactionTest { def main(args: Array[String]): Unit = { var connection: Connection = null var ps1: PreparedStatement = null var ps2: PreparedStatement = null try { //默认MySQL自动提交事物 connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456") //不要自动提交事物 connection.setAutoCommit(false) ps1 = connection.prepareStatement("INSERT INTO t_user1 (name,age) VALUES (?, ?)") //设置参数 ps1.setString(1, "AAA") ps1.setInt(2, 18) //执行 ps1.executeUpdate() val i = 1 / 0 //往另外一个表写入数据 ps2 = connection.prepareStatement("INSERT INTO t_user2 (name,age) VALUES (?, ?)") //设置参数 ps2.setString(1, "BBB") ps2.setInt(2, 28) //执行 ps2.executeUpdate() //多个对数据库操作成功了,在提交事物 connection.commit() } catch { case e: Exception => { e.printStackTrace() //回顾事物 connection.rollback() } } finally { if(ps2 != null) { ps2.close() } if(ps1 != null) { ps1.close() } if(connection != null) { connection.close() } } } }
注意:mysql只有InnoDB引擎支持事务,其它引擎都不支持
3.利用MySQL事务实现数据统计的ExactlyOnce
思路:
从Kafka读取数据,实现ExactlyOnce,偏移量保存到MySQL中
- 1. 将聚合好的数据,收集到driver端(若不收集到driver端,count和偏移量就无法写入一个事务,count数据实在executor中得到,而事务实在driver端得到)
- 2 然后将计算好的数据和偏移量在一个事物中同时保存到MySQL中
- 3 成功了提交事务
- 4 失败了让这个任务重启
代码
package cn._51doit.spark.day13 import java.sql.{Connection, DriverManager, PreparedStatement} import cn._51doit.spark.utils.OffsetUtils import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Milliseconds, StreamingContext} /** * 从Kafka读取数据,实现ExactlyOnce,偏移量保存到MySQL中 * 1.将聚合好的数据,收集到Driver端, * 2.然后建计算好的数据和偏移量在一个事物中同时保存到MySQL中 * 3.成功了提交事物 * 4.失败了让这个任务重启 * * MySQL数据库中有两张表:保存计算好的结果、保存偏移量 */ object ExactlyOnceWordCountOffsetStoreInMySQL { def main(args: Array[String]): Unit = { //true a1 g1 ta,tb val Array(isLocal, appName, groupId, allTopics) = args val conf = new SparkConf() .setAppName(appName) if (isLocal.toBoolean) { conf.setMaster("local[*]") } //创建StreamingContext,并指定批次生成的时间 val ssc = new StreamingContext(conf, Milliseconds(5000)) //设置日志级别 ssc.sparkContext.setLogLevel("WARN") //SparkStreaming 跟kafka进行整合 //1.导入跟Kafka整合的依赖 //2.跟kafka整合,创建直连的DStream【使用底层的消费API,效率更高】 val topics = allTopics.split(",") //SparkSteaming跟kafka整合的参数 //kafka的消费者默认的参数就是每5秒钟自动提交偏移量到Kafka特殊的topic中: __consumer_offsets val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> groupId, "auto.offset.reset" -> "earliest" //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读 , "enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量 ) //在创建KafkaDStream之前要先读取MySQL数据库,查询历史偏移量,没有就从头读,有就接着读 //offsets: collection.Map[TopicPartition, Long] val offsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromMySQL(appName, groupId) //跟Kafka进行整合,需要引入跟Kafka整合的依赖 //createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费 //直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets) //指定订阅Topic的规则 ) kafkaDStream.foreachRDD(rdd => { //判断当前批次的RDD是否有数据 if (!rdd.isEmpty()) { //获取RDD所有分区的偏移量 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //实现WordCount业务逻辑 val words: RDD[String] = rdd.flatMap(_.value().split(" ")) val wordsAndOne: RDD[(String, Int)] = words.map((_, 1)) val reduced: RDD[(String, Int)] = wordsAndOne.reduceByKey(_ + _) //将计算好的结果收集到Driver端再写入到MySQL中【保证数据和偏移量写入在一个事物中】 //触发Action,将数据收集到Driver段 val res: Array[(String, Int)] = reduced.collect() //创建一个MySQL的连接【在Driver端创建】 //默认MySQL自动提交事物 var connection: Connection = null var ps1: PreparedStatement = null var ps2: PreparedStatement = null try { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456") //不要自动提交事物 connection.setAutoCommit(false) ps1 = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = counts + ?") //将计算好的WordCount结果写入数据库表中,但是没有提交事物 for (tp <- res) { ps1.setString(1, tp._1) ps1.setLong(2, tp._2) ps1.setLong(3, tp._2) ps1.executeUpdate() //没有提交事物,不会讲数据真正写入到MySQL } //(app1_g001, wc_0) -> 1000 ps2 = connection.prepareStatement("INSERT INTO t_kafka_offset (app_gid, topic_partition, offset) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset = ?") //将偏移量写入到MySQL的另外一个表中,也没有提交事物 for (offsetRange <- offsetRanges) { //topic名称 val topic = offsetRange.topic //topic分区编号 val partition = offsetRange.partition //获取结束偏移量 val untilOffset = offsetRange.untilOffset //将结果写入MySQL ps2.setString(1, appName + "_" + groupId) ps2.setString(2, topic + "_" + partition) ps2.setLong(3, untilOffset) ps2.setLong(4, untilOffset) ps2.executeUpdate() } //提交事物 connection.commit() } catch { case e: Exception => { //回滚事物 connection.rollback() //让任务停掉 ssc.stop() } } finally { if(ps2 != null) { ps2.close() } if(ps1 != null) { ps1.close() } if(connection != null) { connection.close() } } } }) ssc.start() ssc.awaitTermination() } }
asdfasdf