要初始化一个 Spark Streaming 程序,必须创建一个 StreamingContext 对象,该对象是 SparkStreaming 流处理的编程入口点。2.2版本SparkSession未整合StreamingContext,所以仍需单独创建。
创建 StreamingContext 对象
Spark Streaming 初始化的主要工作是创建 Streaming Context 对象,通过创建函数的参数指明 Master Server,设定应用名称,指定 Spark Streaming 处理数据的时间间隔等。
一个JVM只能有一个StreamingContext启动。
StreamingContext停止后不能再启动。
Spark Streaming 程序开发流程:
一个StreamingContext定义之后,必须执行以下程序进行实时计算的执行
- 定义 StreamingContext
- 通过 StreamingContext API 创建输入 DStream(Input DStream)来创建输入不同的数据源
- 对DStream定义transformation和output等各种算子操作,来定义我们需要的各种实时计算逻辑。
- 调用StreamingContext的start()方法,进行启动我们的实时处理数据。
- 调用StreamingContext的awaitTermination()方法,来等待应用程序的终止。可以使用CTRL+C手动停止,或者就是让它持续不断的运行进行计算。
- 也可以通过调用StreamingContext的stop()方法,来停止应用程序。
创建 StreamingContext两种方法
方法一:通过 SparkConf 创建
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
方法二:通过 SparkContext 创建,通常是使用已有的 SparkContext 来创建 StreamingContext
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
其中 Seconds(1)表示批处理间隔。
注意:
创建 StreamingContext需要注意下面几个问题:
- 一个 JVM 只能有一个 SparkContext 启动。意味着应用程序中不应该出现两个 SparkContext。
- 一个 JVM 同时只能有一个 StreamingContext 启动。但一个SparkContext可以创建多个 StreamingContext,只要上一个treamingContext 先用 stop(false)停止,再创建下一个即可。默认调用stop()方法时,会同时停止内部的SparkContext。如果只想仅关闭StreamingContext对象,设置stop()的可选参数为false。
- StreamingContext 停止后不能再启动。也就是说调用 stop()后不能再 start()。
- StreamingContext 启动之后,就不能再往其中添加任何计算逻辑了。也就是说执行 start()方法之后,不能再使 DStream 执行任何算子。
- 一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)。
下面以代码的形式来简单介绍下创建StreamingContext来执行SparkStreaming的整个流程:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
//创建StreamingContext
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf,Seconds(1))
// 指定数据源,创建连接至 hostname:port 的 DStreamCreate,如 localhost:9999
val lines = ssc.socketTextStream("localhost",9999)
//把每一行通过空格符分割成多个字符
val words = lines.flatMap (_.split(" "))
//对输入的流进行操作
val pairs = words.map(x => (x, 1))
val wordCounts = pairs.reduceByKey(_+_)
//打印该 DStream 生成的每个 RDD 中的字符
wordCounts.print()
//通过start()启动消息采集和处理
ssc.start()
//启动完成后就不能再做其它操作
//等待计算完成
ssc.awaitTermination()
这样一个基本的Spark Streaming 运行流程就完成了。