概述:
Structured Streaming 是一个构建在Spark SQL 引擎上,可扩展,容错的的流处理引擎。您可以像编写静态数据的批处理程序一样,编写流处理程序。Spark SQL 引擎会增量的运行流处理程序,并持续的,在有流数据到来时更新最终结果。您可以使用DataSet/DataFrame API 来编写流聚合、事件窗口、流和批 join等操作(目前支持Scala,Java,Python和R)。这些计算都在优化后的Spark SQL 引擎上执行。系统会通过检查点和WAL来保证端对端的 exactly-once 容错语义。简单来说,Structured Streaming 提供了快速、可扩展、容错、 exactly-once 容错语义的流处理,而不需要用户去关注流本身。
在内部,Structured Streaming 默认使用微批处理引擎来处理查询,它将数据当做一系列的小批次job来处理,在保证 exactly-once 的情况下获得了低至100ms的延迟。不过,在Spark 2.3 之后,Spark 引入一个新的低延迟处理模式,叫做 Continues Processing,这种新模式在保证exactly-once 语义的情况下延迟可以低至1ms。在不改变查询中Dataset/DataFrame 操作的情况下,你可以基于需求切换这个模式。
示例:
需求:假如您想运行一个 word count程序,计算从 TCP socket接收到的文本中的单词数。
依赖:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
代码:
package com.ccclubs.spark
import org.apache.spark.sql.SparkSession
/**
* Author: xianghu.wang
* Date: 2018/11/2
* Description: StructuredStreamingDemo
*/
object StructuredStreamingDemo {
def main(args: Array[String]): Unit = {
// 创建Spark程序入口
val sparkSession = SparkSession
.builder()
.appName("StructuredNetworkWordCount")
.master("local[*]")
.getOrCreate()
import sparkSession.implicits._
// 创建监听 localhost:9999 的DataFrame流
val lines = sparkSession.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.load()
// 将行数据分割成单词
/**
* lines DataFrame代表一个包含流文本数据的无界表,这个表只有一列数据, 列名为“value”。
* 流文本数据中的每一行都会成为表的一行。
* 为了使用 flatMap函数,我们使用.as[String]方法将DataFrame转换为DataSet[String]
*/
val words = lines.as[String]
.flatMap(_.split(" "))
// 计算 word count
val wordCounts = words.groupBy("value").count()
// 开始查询,把查询结果打印在控制台(完整模式)
/**
* 输出模式有三种,complete,append,update:
* Complete Mode:输出所有结果
* Append Mode: 只输出当次批次中处理的结果(未和之前处理的结果合并)
* Update Mode: 只输出结果有变化的行
*/
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
// 执行
query.awaitTermination()
}
}
netcat 输入:
运行结果:
每次输入触发一次计算,所以netcat的每一行输入对应一个Batch。