Spark Streaming 概要
在内部,其按如下方式运行。Spark Streaming接收到实时数据流同时将其划分为分批,这些数据的分批将会被Spark的引擎所处理从而生成同样按批次形式的最终流。
例子
在深入了解如何编写你自己的SS程序之前,让我们先迅速浏览下基本的SS程序是什么样的。假设我们想统计文本数据中单词个数(数据来自于监听一个TCP接口的数据服务器)。你只需要这样做:
第一步,加载入StreamingContext,这个是所有流功能函数的主要访问点,我们使用两个执行线程和1s的批次间隔来创建本地的StreamingContext:
maven
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.2.1</spark.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
</dependencies>
package com.hx.test
/**
* fileName : Test11StreamingWordCount
* Created by 970655147 on 2016-02-12 13:21.
*/
object Test11StreamingWordCount {
// 基于sparkStreaming的wordCount
// 环境windows7 + spark1.2 + jdk1.7 + scala2.10.4
// 1\. 启动netcat [nc -l -p 9999]
// 2\. 启动当前程序
// 3\. netcat命令行中输入数据
// 4\. 回到console, 查看结果[10s 之内]
// *******************************************
// 每一个print() 打印一次
// -------------------------------------------
// Time: 1455278620000 ms
// -------------------------------------------
// Another Infomation !
// *******************************************
// inputText : sdf sdf lkj lkj lkj lkj
// MappedRDD[23] at count at Test11StreamingWordCount.scala:39
// 2
// (sdf,2), (lkj,4)
def main(args :Array[String]) = {
// Create a StreamingContext with a local master
// Spark Streaming needs at least two working thread
val sc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10) )
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = sc.socketTextStream("192.168.47.141", 9999)
// Split each line into words
// 以空格把收到的每一行数据分割成单词
val words = lines.flatMap(_.split(" "))
// 在本批次内计单词的数目
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
// 打印每个RDD中的前10个元素到控制台
wordCounts.print()
sc.start()
sc.awaitTermination()
}
}
words DStream 进一步被映射成(一对一的转换)(word,1)对的DStream形式,这个“对”形式的DStream将会被reduced(一个Spark操作)以取得数据各个批次中单词的统计。最后,wordCounts.print()打印处每一秒所获得的少量计数值。
注意:这么多行代码被执行后,SS仅仅设置了其若开始运行将要进行的运算,但是并没有开始真正意义上的处理。在所有的转换都部署完毕后,我们需要调用下面两个操作来真正启动处理:
ssc.start() # 开始计算
ssc.awaitTermination() # 等待计算终止
如果你已经下载并编译了Spark,就可以按如下讲解来运行这个例子。首先你需要运行Netcat(大多数类Unix系统都有的工具)作为数据服务器:
$ nc -l 9999
运行netcat终端上的任何键入的行将会被计算并打印到屏幕上。
# TERMINAL 1:
# Running Netcat
$ nc -l 9999
hello world
# TERMINAL 2: RUNNING network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
(hello,1)
(world,1)
这些底层的RDD转换是通过Spark引擎计算的。DStream操作隐藏了大多数细节,同时为了方便为开发者提供了一个高层的API。这里的一些操作会在下文中详述