版权声明:原创文章,转载请注明出处 https://blog.csdn.net/xianpanjia4616/article/details/82914443
今天来说一下spark,动态广播变量的用法,如果对广播变量用法不清楚的可以查看这个博客,在实际项目中,有时候我们的广播变量是动态的,比如需要一分钟更新一次,这个也是可以实现的,我们知道广播变量是在driver端初始化,在excetors端获取这个变量,但是不能修改,所以,我们可以在driver端进行更新这个变量,具体的代码实现如下所示:
package test
import java.sql.{Connection, DriverManager, ResultSet, Statement}
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import kafka._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{StringDeserializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.{SparkConf, SparkContext}
object test3 {
@volatile private var instance: Broadcast[Map[String, Double]] = null
var kafkaStreams: InputDStream[ConsumerRecord[String, String]] = null
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)
Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO)
val conf = new SparkConf().setAppName("Spark Streaming TO ES TOPIC")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
@transient
val scc = new StreamingContext(conf, Seconds(1))
val topic = PropertiesScalaUtils.loadProperties("topic_combine")
val topicSet = Set(topic) //设置kafka的topic;
val kafkaParams = Map[String, Object](
"auto.offset.reset" -> "earliest", //latest;earliest
"value.deserializer" -> classOf[StringDeserializer] //key,value的反序列化;
, "key.deserializer" -> classOf[StringDeserializer]
, "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker")
, "group.id" -> PropertiesScalaUtils.loadProperties("groupId_es")
, "enable.auto.commit" -> (false: java.lang.Boolean)
)
//初始化instance;
getInstance(scc.sparkContext)
kafkaStreams = KafkaUtils.createDirectStream[String, String](
scc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
kafkaStreams.foreachRDD(rdd => {
val current_time = sdf.format(new Date())
val new_time = current_time.substring(14,16).toLong
if(new_time % 5 == 0){
update(rdd.sparkContext,true) //五分钟更新一次广播变量的内容;
}
if (!rdd.isEmpty()) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获得偏移量对象数组
rdd.foreachPartition(pr => {
pr.foreach(pair => {
val d = pair.value()
if(instance.value.contains(d)){
//自己的处理逻辑;
}
})
})
}
})
scc.start()
scc.awaitTermination()
}
/**
* 从sqlserver获取数据放到一个map里;
* @return
*/
def getSqlServerData(): Map[String,Double] = {
val time = sdf.format(new Date())
val enter_time = time.substring(0,10)
var map = Map[String,Double]()
var conn:Connection = null
var stmt:Statement = null
var rs:ResultSet = null
val url = ""
val user_name = ""
val password = ""
val sql = ""
try {
conn = DriverManager.getConnection(url,user_name,password)
stmt = conn.createStatement
rs = stmt.executeQuery(sql)
while (rs.next) {
val url = rs.getString("url")
val WarningPrice = rs.getString("WarningPrice").toDouble
map += (url -> WarningPrice)
}
if (rs != null) {
rs.close
rs = null
}
if (stmt != null) {
stmt.close
stmt = null
}
if (conn != null) {
conn.close
conn = null
}
} catch {
case e: Exception => e.printStackTrace()
println("sqlserver连接失败:" + e)
}
map
}
/**
* 更新instance;
* @param sc
* @param blocking
*/
def update(sc: SparkContext, blocking: Boolean = false): Unit = {
if (instance != null){
instance.unpersist(blocking)
instance = sc.broadcast(getSqlServerData())
}
}
/**
* 初始化instance;
* @param sc
* @return
*/
def getInstance(sc: SparkContext): Broadcast[Map[String,Double]] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(getSqlServerData())
}
}
}
instance
}
}
这个是从sqlserver获取的数据,广播到每一个excetors上,然后五分钟更新一次,这个变量的值;
如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,谢谢