版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lv_yishi/article/details/83925672
/*
transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中
所没有提供的操作。比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join
的操作。但是我们自己就可以使用transform操作来实现该功能。
DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行
join
*/
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 过滤广告黑名单案例
* 用户对我们的网站上的广告可以进行点击
* 点击之后,是不是要进行实时计费,点一下,算一次钱
* 但是,对于那些帮助某些无良商家刷广告的人,那么我们有一个黑名单
* 只要是黑名单中的用户点击的广告,我们就给过滤掉
* transform操作,应用在DStream上,可以用于执行任意的RDD到RDD的转换操作。
*他可以用于实现,DStream API中所没有提供的操作。
*
*/
object transformDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("windowOpObj").setMaster("local[*]")
val ssc= new StreamingContext(conf,Seconds(5))
//定义一个黑名单
val blackList=List(("tom",true),("jerry",true))
val blRDD = ssc.sparkContext.parallelize(blackList)
//使用socketTestStream来监听端口,也就是接收到的实时数据
val sockrtData = ssc.socketTextStream("192.168.88.130",8888)
//解析出用户的信息,将接收到的数据转换成和我们黑名单对应得形式数据格式
val user: DStream[(String, String)] = sockrtData.map(line=>(line.split("_")(1),line))
//将两个结果集放到一个RDD里面
//使用Spark Streaming中的transform算子操作,实现过滤
//transform转换之后DStream变为RDD
val fllterRDD = user.transform(u=>{
//(KEY,("收到的行信息",null/true))
val joinrdd=u.leftOuterJoin(blRDD)
//我们将黑名单过滤后,将处理真正的白名单数据
val filterrdd = joinrdd.filter(tuple=>{
if(tuple._2._2.getOrElse(false)){
false
}else{
true
}
})
val valedRDD = filterrdd.map(tuple=>tuple._2._1)
valedRDD
})
fllterRDD.print()
ssc.start()
ssc.awaitTermination()
}
}
/*
Time: 1541755835000 ms
Time: 1541755840000 ms
*/