spark streaming
作者:黑哥
1,spark streaming 简介
Spark Streaming 是一个分布式数据流处理框架。 使用它可以很容易地开发近乎实时的 分布式实时数据流处理程序。 它不仅拥有简单的编程模型,还能处理大数量的数据流。 使 用它也可以把历史数据和实时数据结合起来处理。
2,spark streaming API
SparkStreaming API 有两个关键抽象 StreamingContext 和离散流。
2.1 StreamingContext
StreamingContext 是一个在 Spark Streaming 库中定义的类,它是 Spark Streaming 库 的入口点g 它使得 Spark Streaming 应用能够连接到 Spa,rk 集群上。 它还提供了创建 Spark Streaming 数据流抽象实例的方法。
注意:每一个 Spark Streaming 应用都必须创建一个 StreamingContext 类实例。
streamingcontext 的创建实例:
val config = new SparkConf ()
.setMaster( "spark: //host: port")
.setAppName("big streaming app")
val batch!nterval = 5
val ssc = new StreamingContext(conf, Seconds(batch!nterval))
//或从已有的了spark context类实例,可应它来创建streamingcontext类实例,如下:
val conf = new SparkConf ()
.setMaster( "spark: //host: port")
.setAppName("big streaming app")
val sc = new SparkContext(conf)
val batch!nterval = 5
val ssc = new StreamingContext(sc, Seconds(batch!nterval))
StreamingContext 构造函数的第二个参数指定了每一批的大小,以时间为单位。
数据流 根据这个时间间隔分割成批,每一批数据都被当成一个 ROD 来处理。 上面的例子中每一批 的时间间隔为 10 秒钟。 Spark Streaming 将每 10 秒钟从数据流源创建一个 RDD。
2.2 Dstream
离散数据流( DStream )是 Spark Streaming 处理数据流的主要抽象。 它代表一个数据 流.并且可以使用它上面定义的操作来处理数据流。
创建Dstream:可以从一个数据流来源创建,也可以通过从现有的 DStream 执行转换操作得 到。
2.2.1 从基本源创建 DStream
- socketTextStream
val lines = ssc.socketTextStream(”localhost", 9999)
它有 3 个 参数。第 l 个参数是数据源的主机名。
第 2 个参数是接收数据的连接的端口号。
第 3 个参 数是可选的,用于指定接收数据的存储等级。
- textFi leStream
val lines = ssc.textFileStream("input_directory”)
//textFileStream 方法将会创建一个 DStream 用于监控 Hadoop 兼容的文件系统上是否有 新文件创建,
//如果有.就作为文本文件读取新文件内容。
这个方法的参数是需要监控的文 件目录。 写入被监控目录的文件必须是从同样文件系统中移动过来的。
注意:比如在 Linux 系统 中,写入被监控目录的文件必须是使用 mv 命令移动过来的。
- actorStream
actorStream 方法将会创建一个使用用户自己实现的 Akka actor 接收器的 DStream。
2.2.2 高级服源
Spark Streaming 本身不提供从南自r-1 Kafka 、 Flume 或 Twitter 这样的高级惊创建 DStream 的工厂方法 ,只有第三方库才提供。 为了能处理高级源中的数据流, Spark Streaming 应用 需要按照如下步骤编写。 l 导人高级师、对应的工具类, jf:使用该类提供的工厂方法创建 DStream0 2. 链J夹带有这个工具类的库。 3 创建一个包含应用所有依赖的 uber JAR,然后将应用部署在 Spark 集群上。 举例来说,为了处理 Twitter 的 tweet,应用不得不导入 TwitterUtils 类.并使用它的 createStream 方法米创建一个用于处理 tweet 的 DStream。
import org.apache.spark.streaming.twitter._
val tweets = TwitterUtils.createStream(ssc, None)
3, 处理数据流
DStream 提供了两类操 作:转换和l输出操作。转换可以进一步细分成如下几类:基本转换、聚合转换、键值对转换 、 特殊转换。
3.1 基本转换
map
val lines = ssc.socketTextStream(”localhost", 9999)
val lengths = lines map {line => line.length}
flatMap
val lines = ssc.socketTextStream(”localhost", 9999)
val words = lines flatMap {line => line.split(””)}
filte
val lines = ssc.socketTextStream(”localhost”, 9999)
val nonBlanklines = lines filter {line => line.length > o}
repairtion
val inputStream = ssc.socketTextStream(”localhost”, 9999)
inputStream.repartition(lO)
union
val streaml = ...
va2 streaml = ...
val combinedStream = strearm1.union(stream2)
3.2 聚合转换
count
val inputStream = ssc.socketTextStream(“localhost", 9999)
val countsPerRdd = inputStrear『1.count()
reduce
val lines = ssc. socketTextStream(”localhost”, 9999)
val words = lines flatMap {line => line.split(”..)}
val longestWords = words reduce { (wl, w2) => if(wl. length > w2. length)w1 else w2}
countByValue
val lines = ssc.socketTextStream("localhost”, 9999)
val words = lines flatMap {line => line.split(" ")}
val wordCounts = words.countByValue()
3.3 键值对转换
cogroup
在由( K, Seq[V])构成的 DStream 和由( K, Seq[W])构成的 DStream 上执行 cogroup 方法,将返回一个由恨, Seq[呵, Seq[WJ)构成的 DStreamo cogroup 方法实际上在原 DStream 的 RDD 和作为参数的 DStream 的 RDD 上执行 cogroup 操作。
val linesl = ssc.socketTextStream(”localhost”, 9999)
val words1 = lines1 flatMap {line => line.split(””)}
val wordlenPairsl = wordsl map {w => (w.length, w)}
val wordsBylenl = wordlenPairsl. groupByKey
val lines2 = ssc.socketTextStream(”localhost”, 9998)
val words2 = lines2 flatMap {line => line.split(”..)}
val wordlenPairs2 =队mds2 map {w => (w.length, w)}
val wordsBylen2 = wordLenPairs2.groupByKey
val wordsGroupedByLen = wordsByLen1.cogroup(wordsByLen2)
上面的例子展示如何使用 cogroup 方法来找出两个 DStream 中长度一样的单词。
join
join 方法把一个由键值对构成的 DStream 作为参数,返回一个新的 DStream。 返回的 DStream 是原 DStream 和作为参数的 DStream 做内连接的结果。当 join 方法作用于由( K, V)构成的 DStream 和由( K, W)构成的 DStream 时,将返回-个由( K, ( V, W))构成的
DStream.
val linesl = ssc.socketTextStream("localhost飞 9999)
val wordsl = linesl flatMap {line => line.split(…·)}
val wordLenPairs1 = words1 map {w => (w.length, w)}
val lines2 = ssc.socketTextStream(”localhost”, 9998)
val words2 = lines2 flatMap {line => line.split(””)}
val wordLenPairs2 = words2 map {w => (w. length, w)}
val wordsSamelength = wordlenPairst.join(wordLenPairs2)
上面的例子中创建了两个由文本行构成的 DStreamo 然后,将它们各自变成由单词构 成的 DStream。
groupByKey
group By Key 方法根据构成 DStream 的 RDD 中的键将这些 RDD 中的元素进行分 组。 它返回一个新的 DStream,这个 DStream 实际上由在构成原 DStream 的 ROD 上进行 groupByKey 方法后的得到的 RDD 构成。
val lines = ssc.socketTextStream(”localhost”, 9999)
val words = lines 于latMap {line => line.split("”)}
val wordLenPairs = words map {w => (w.length, w)}
val wordsBylen = wordlenPairs.groupByKey
reduceByKey
reduceByKey 方法返回一个新的键值对 DStreamc 它把用户定义的函数作用于原 DStream 的每一个 RDD 的元素上,由此得到新的键值对构成了返回的 DStream.
下面的例子展示了如何统计 DStream 每一个批次中每个单词出现的个数。
val lines = ssc.socketTextStream("localhost”, 9999)
val words = lines flatMap {line => line.split(””)}
val wordPairs = words map { word => (word, 1)}
val wordconts = wordPairs. reduceByKey(_ + _)
3.4 特殊转换
至今为止,我们介绍的转换操作都可以操作 DStream 中的元素。 实际上, DStream 是 把它们转变成 RDD 操作实现的。 下面介绍的转换操作却不是这样的模式。
transform
transform 方法返回一个 DStream,这个 DStream 是通过将一个参数为 ROD 并且返回 值也是 ROD 的函数作用在原 DStream 上的每一个 RDD 得到的。 transform 方法的参数就 是那个参数是 RDD 并且返回值也是 RDD 的函数。 通过 transform 方法可以直接访问构成 DStream 的 ROD。
transform 方法使得你可以使用 RDD API 提供的方法,但是只能使用那些 DStream API 没有的。举例来说, sortBy 是 RDD API 提供的一个方法,但是 DStream API 没有这个方 法。如果你想对构成 DStream 的每一个 RDD 中的元素进行排序,可以像如下例子这样使用 transform 方法来做这件事。
val lines = ssc.socketTextStream(”localhost", 9999)
val words = lines.flatMap{line => line.split(" ”)}
val sorted= words.transform{rdd => rdd.sortBy((w)=> w)}
数据流上使用机器学习算法和图计算算法的时候, transform 方法就显得十分有用了 。 机器学习和图计算库提供的类与方法一般都是作用在 RDD 这一级别上的。 在 transform 方 法中,就可以使用这些类提供的 API。
updateStateByKey
updatestateB y Key 方法使得你在处理一个键值对 DStream 时为每一个键创建状态、更 新状态。 可以使用这个方法为 DStream 中的每一个键维护一些信息。 举例来说,可以使用 updateStateByKey 方法来计算一个 DStraem 中每一个单词出现的 次数,代码如下所示。
val lines = ssc.socketTextStrea叫”localhost”, 9999)
val words = lines.flatMap{line => line.split(””)}
val wordPairs = words.map{word => (word, 1)}
val updateState = (xs: Seq[Int], prevState: Option[Int]) => {
prevState match {
case some(prevcount) => some(prevcount + xs.sum)
case None => Some(xs.sum)
}
}
val runningCount = wordPairs.updateStateByKey(updateState)
4, 保存至文件系统
- saveAsTextFiles
- saveAsObjectFiles
- saveAsHadoopFiles
- saveAsNewAPIHadoopFiles
5, 在控制台上显示
val ssc = new StreamingContext(conf, Seconds(interval))
val lines = ssc.socketTextStream(”local host”, 9999)
val words = lines flatmap {line => line.split(””)}
val longwords = words filter { word => word.length > 3}
longWords.print(s)
6, 保存至数据库中
DStream 类中的 foreachRDD 方法可以将 DStream 的处理结果保存至数据库中。
下面的代码片段展示了如何像上面描述的那样把 DStream 保存至数据库中。
resultDStream.foreachRDD { rdd => rdd.foreachPartition { iterator =>
val dbConnection = ConnectionPool.getConnection()
val statement = dbConnection. createStatement()
iterator.foreach {element =>
val result = statement.executeUpdate(”...”)
//check the result
statement.close()
//return connection to the pool
dbConnection.close()