版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_24084925/article/details/81061818
最近写的一个流式的程序需要从redis 中获取变量信息,并广播,其中redis里面的信息是变动的,要求广播变量也要跟着改变,下面是测试代码:
val dStream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(x => URLDecoder.decode(x.value())) val appkeyMap2 = RedisFieldData.getMap appKeyBroadCast =sc.broadcast(appkeyMap2) //输出每分钟的计算结果 dStream.foreachRDD{ rdd => var map: scala.collection.mutable.Map[String, String] = mutable.Map[String, String]() var keySet: Set[String] = Set() var valueSet: Set[String] = Set() import scala.collection.JavaConversions._ val appkeyMap = RedisFieldData.getMap //.asInstanceOf[scala.collection.immutable.Map[String,String]] if (appkeyMap != null) { appkeyMap.map { x => keySet += x._1 } appkeyMap.map { x => valueSet += x._2 } } if(appKeyBroadCast!=null){ appKeyBroadCast.unpersist() println("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") } appKeyBroadCast =sc.broadcast(appkeyMap) println("广播变量的数量xxxxx : "+appKeyBroadCast.value.size()) println("=============================================================================================") //rdd.take(10) /*if ((appkeySet -- keySet) != Set()){ map.putAll(updateOrSetAppKeyMap(appkeySet, keySet, valueSet)) RedisFieldData.setMap(map) //appkeyMap.putAll(map) map.putAll(appkeyMap) }*/ //println("new map :"+map.isEmpty) } // if(appKeyBroadCast==null){ println("广播变量的数量 : "+appKeyBroadCast.value.size()) //} dStream.print()
代码分别在两个进行广播的测试,一个在foreachRDD 中@1 另一个为@2
@1:每一个duration中,广播变量都会更新,并且更新的广播变量在foreachRDD外无效
@2:广播代码只在启动后运行一次,其他duration不运行
另外,广播变量时会遇到序列化问题,去掉checkPoint之后,问题解决,查询之后得知现在版本的checkpoint还有很多问题,具体参考如下:checkpoint的弊端: http://blog.csdn.net/u010454030/article/details/54985740