这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战
一、概述
DStream
上的操作与 RDD
的类似, 分为 Transformations
(转换) 和 Output Operations
(输出)两种, 此外转换操作中还有一些比较特殊的方法, 如: updateStateByKey
、transform
以及各种 Window
相关的操作。
备注:
- 在
DStream
与RDD
上的转换操作非常类似(无状态的操作) DStream
有自己特殊的操作(窗口操作、追踪状态变化操作)- 在
DStream
上的转换操作比RDD
上的转换操作少
DStream
的转化操作可以分为 无状态(stateless
) 和 有状态(stateful
) 两种:
- 无状态转化操作。每个批次的处理不依赖于之前批次的数据。常见的
RDD
转化操作, 例如map
、filter
、reduceByKey
等 - 有状态转化操作。需要使用之前批次的数据 或者是 中间结果来计算当前批次的数据。有状态转化操作包括: 基于滑动窗口的转化操作 或 追踪状态变化的转化操作
二、无状态转换
无状态转化操作就是把简单的 RDD
转化操作应用到每个批次上, 也就是转化 DStream
中的每一个 RDD
。
常见的无状态转换包括: map
、flatMap
、filter
、repartition
、reduceByKey
、groupByKey
;
直接作用在
DStream
上。
案例:黑名单过滤
假设: arr1
为黑名单数据(自定义), true
表示数据生效, 需要被过滤掉; false
表示数据未生效 val arr1 = Array(("spark", true), ("scala", false))
假设: 流式数据格式为 "time word", 需要根据黑名单中的数据对流式数据执行过滤操作。如 "2 spark" 要被过滤掉
1 hadoop
2 spark
3 scala
4 java
5 hive
// 结果:"2 spark" 被过滤
复制代码
方法有三:
- 方法一: 使用外连接
- 方法二: 使用
SQL
- 方法三: 直接过滤
1) 方法一:使用外链接
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author donald
* @date 2021/02/23
*/
object BlackListFilter1 {
def main(args: Array[String]) {
// 初始化
val conf = new
SparkConf().setAppName(this.getClass.getCanonicalName).setMaster(
"local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("WARN")
// 黑名单数据
val blackList = Array(("spark", true), ("scala", true))
val blackListRDD = ssc.sparkContext.makeRDD(blackList)
// 生成测试DStream。使用ConstantInputDStream
val strArray: Array[String] = ("spark java scala hadoop kafka " +
"hive hbase zookeeper")
.split("\\s+")
.zipWithIndex
.map { case (word, idx) => s"$idx $word" }
val rdd = ssc.sparkContext.makeRDD(strArray)
val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理
val clickStreamFormatted = clickStream.map(value =>
(value.split(" ")(1), value))
clickStreamFormatted.transform(clickRDD => {
// 通过leftOuterJoin操作既保留了左侧RDD的所有内容,又获得了内容是否在黑名单中
val joinedBlackListRDD: RDD[(String, (String, Option[Boolean]))] = clickRDD.leftOuterJoin(blackListRDD)
joinedBlackListRDD.filter { case (word, (streamingLine, flag)) =>
if (flag.getOrElse(false)) false
else true
}.map { case (word, (streamingLine, flag)) => streamingLine
}
}).print()
// 启动流式作业
ssc.start()
ssc.awaitTermination()
}
}
复制代码
2)方法二:使用 SQL
3)方法三:直接过滤
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author donald
* @date 2021/02/23
*/
object BlackListFilter3 {
def main(args: Array[String]) {
// 初始化
val conf = new
SparkConf().setAppName(this.getClass.getCanonicalName).setMaster(
"local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("WARN")
// 黑名单数据
val blackList = Array(("spark", true), ("scala", true))
val blackListBC: Broadcast[Array[String]] =
ssc.sparkContext.broadcast(blackList.filter(_._2).map(_._1))
// 生成测试DStream。使用ConstantInputDStream
val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper"
.split("\\s+")
.zipWithIndex
.map { case (word, idx) => s"$idx $word" }
val rdd = ssc.sparkContext.makeRDD(strArray)
val clickStream = new ConstantInputDStream(ssc, rdd)
// 流式数据的处理
clickStream.map(value => (value.split(" ")(1), value))
.filter { case (word, _) =>
!blackListBC.value.contains(word)
}
.map(_._2)
.print()
// 启动流式作业
ssc.start()
ssc.awaitTermination()
}
}
复制代码
三、有状态转换
有状态的转换主要有两种: 窗口操作、状态跟踪操作
(1)窗口操作
Window Operations
可以设置窗口大小和滑动窗口间隔来动态的获取当前 Streaming
的状态。
基于窗口的操作会在一个比 StreamingContext
的 batchDuration
(批次间隔)更长的时间范围内, 通过整合多个批次的结果, 计算出整个窗口的结果。
如图:
基于窗口的操作需要两个参数:
- 窗口长度(
windowDuration
)。控制每次计算最近的多少个批次的数据 - 滑动间隔(
slideDuration
)。用来控制对新的DStream
进行计算的间隔
两者都必须是 StreamingContext
中批次间隔(batchDuration
)的整数倍。
每秒发送1个数字:
import java.io.PrintWriter
import java.net.{ServerSocket, Socket}
/**
* @author donald
* @date 2021/02/23
*/
object SocketLikeNCWithWindow {
def main(args: Array[String]): Unit = {
val port = 1521
val ss = new ServerSocket(port)
val socket: Socket = ss.accept()
println("connect to host : " + socket.getInetAddress)
var i = 0
// 每秒发送1个数
while(true) {
i += 1
val out = new PrintWriter(socket.getOutputStream)
out.println(i)
out.flush()
Thread.sleep(1000)
}
}
}
复制代码
1)举个栗子一
- 观察窗口的数据;
- 观察
batchDuration
、windowDuration
、slideDuration
三者之间的关系; - 使用窗口相关的操作;
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author donald
* @date 2021/02/23
*/
object WindowDemo {
def main(args: Array[String]): Unit = {
val conf = new
SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
// 每 5s 生成一个RDD(mini-batch)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("error")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 1521)
lines.foreachRDD{ (rdd, time) =>
println(s"rdd = ${rdd.id}; time = $time")
rdd.foreach(value => println(value))
}
// 20s 窗口长度(ds包含窗口长度范围内的数据);10s 滑动间隔(多次时间处理一次数据)
val res1: DStream[String] = lines.reduceByWindow(_ + " " + _,
Seconds(20), Seconds(10))
res1.print()
val res2: DStream[String] = lines.window(Seconds(20),
Seconds(10))
res2.print()
// 求窗口元素的和
val res3: DStream[Int] =
lines.map(_.toInt).reduceByWindow(_+_, Seconds(20), Seconds(10))
res3.print()
// 求窗口元素的和
val res4 = res2.map(_.toInt).reduce(_+_)
res4.print()
ssc.start()
ssc.awaitTermination()
}
}
复制代码
2)举个栗子二
热点搜索词实时统计:每隔 10 秒, 统计最近20秒的词出现的次数。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author donald
* @date 2021/02/23
*/
object HotWordStats {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
//设置检查点,检查点具有容错机制。生产环境中应设置到HDFS
ssc.checkpoint("data/checkpoint/")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
// 通过reduceByKeyAndWindow算子, 每隔10秒统计最近20秒的词出现的次数
// 后 3个参数:窗口时间长度、滑动窗口时间、分区
val wordCounts1: DStream[(String, Int)] =
pairs.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,
Seconds(20),
Seconds(10), 2)
wordCounts1.print
// 这里需要checkpoint的支持
val wordCounts2: DStream[(String, Int)] =
pairs.reduceByKeyAndWindow(
_ + _,
_ - _,
Seconds(20),
Seconds(10), 2)
wordCounts2.print
ssc.start()
ssc.awaitTermination()
}
}
复制代码
(2)updateStateByKey
状态跟踪操作
UpdateStateByKey
的主要功能:
- 为
Streaming
中每一个Key
维护一份state
状态,state
类型可以是任意类型的, 可以是自定义对象; 更新函数也可以是自定义的。 - 通过更新函数对该
key
的状态不断更新, 对于每个新的batch
而言,Spark Streaming
会在使用updateStateByKey
的时候为已经存在的key
进行state
的状态更新 - 使用
updateStateByKey
时要开启checkpoint
功能
流式程序启动后计算 wordcount
的累计值, 将每个批次的结果保存到文件:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author donald
* @date 2021/02/23
*/
object StateTracker1 {
def main(args: Array[String]) {
val conf: SparkConf = new SparkConf().setMaster("local[2]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("data/checkpoint/")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val wordDstream: DStream[(String, Int)] = words.map(x => (x,
1))
// 定义状态更新函数
// 函数常量定义,返回类型是Some(Int),表示的含义是最新状态
// 函数的功能是将当前时间间隔内产生的Key的value集合,加到上一个状态中, 得到最新状态
val updateFunc = (currValues: Seq[Int], prevValueState:
Option[Int]) => {
//通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的 Seq,再计算当前批次的总和
val currentCount = currValues.sum
// 已累加的值
val previousCount = prevValueState.getOrElse(0)
Some(currentCount + previousCount)
}
val stateDstream: DStream[(String, Int)] =
wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
// 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录
val outputDir = "data/output1"
stateDstream.repartition(1)
.saveAsTextFiles(outputDir)
ssc.start()
ssc.awaitTermination()
}
}
复制代码
统计全局的 key
的状态, 但是就算没有数据输入, 也会在每一个批次的时候返回之前的 key
的状态。
这样的缺点: 如果数据量很大的话, checkpoint
数据会占用较大的存储, 而且效率也不高。
mapWithState
: 也是用于全局统计 key
的状态。如果没有数据输入, 便不会返回之前的 key
的状态, 有一点增量的感觉。
这样做的好处是, 只关心那些已经发生的变化的 key
, 对于没有数据输入, 则不会返回那些没有变化的 key
的数据。即使数据量很大, checkpoint
也不会像 updateStateByKey
那样, 占用太多的存储。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
/**
* @author donald
* @date 2021/02/23
*/
object StateTracker2 {
def main(args: Array[String]) {
val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getCanonicalName)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("data/checkpoint/")
val lines: ReceiverInputDStream[String] =
ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
// 函数返回的类型即为 mapWithState 的返回类型
// (KeyType, Option[ValueType], State[StateType]) => MappedType
def mappingFunction(key: String, one: Option[Int], state:
State[Int]): (String, Int) = {
val sum: Int = one.getOrElse(0) +
state.getOption.getOrElse(0)
state.update(sum)
(key, sum)
}
val spec = StateSpec.function(mappingFunction _)
val resultDStream: DStream[(String, Int)] =
wordDStream.mapWithState[Int, (String, Int)](spec)
resultDStream.cache()
// 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录
val outputDir = "data/output2/"
resultDStream.repartition(1)
.saveAsTextFiles(outputDir)
ssc.start()
ssc.awaitTermination()
}
}
复制代码