离散流或 DStream 是 Spark Streaming 提供的高级别抽象。它表示连续的数据流,可以是从源接收到的输入数据流,也可以是通过转换输入流生成的经过处理的数据流。
在内部,DStream 由一系列连续的 RDDs 表示,RDDs 是 Spark 对不可变的分布式数据集的抽象。DStream 中的每个 RDD 包含来自某个间隔(batch interval)的数据,如下图所示:
应用于DStream上的任何操作都转换为底层RDDs上的操作。
Input DStreams 与 Receivers(接收器)
Spark Streaming 的所有操作都是基于流的,而输入源是这一系列操作的起点。
Input DStreams 是表示从流媒体源接收的输入数据流的 DStream,通常是第一个 DStream。在上篇文章中提到的示例中,行是一个输入 DStream,因为它表示从 netcat 服务器接收到的数据流。每个 Input DStream(除后面讨论的文件流外)都与一个接收方(Receiver)对象相关联,接收方接收来自源的数据并将其存储在 Spark 内存中进行处理。
输入 DStream 和 DStream 接收的流都代表输入数据流的来源,Spark Streaming 提供了两种内置数据流来源:基础来源和高级来源。
基础来源
基础来源是在 StreamingContext API 中直接可用的来源,如文件系统、Socket (套接字)连接等。
在上一篇的文章的例子中已经使用了 ssc.socketTextStream() 方法,即通过 TCP 套接字连接,从文本数据中创建一个 DStream。除了套接字之外,StreamingContext 的 API 还提供了从文件和 Akka actors 中创建 DStreams 作为输入源的方法。
Spark Streaming 提供了streamingContext.fileStream(dataDirectory) 方法,该方法可以从任何文件系统(如 HDFS、S3、NFS 等)的文件中读取数据,然后创建一个 DStream。
Spark Streaming 监控 dataDirectory 目录和在该目录下的所有文件的创建处理过程。需要注意的是,文件必须是具有相同的数据格式的,创建的文件必须在 dataDirectory 目录下。对于简单的文本文件,可以使用一个简单的方法 StreamingContext.textFileStream(dataDirectory) 来读取数据。
Spark Streaming 也可以基于自定义 Actors 的流创建 DStream。通过 Akka actors 接收数据流的使用方法是 streamingContext.actorStream(actorProps,actor—name)。
Spark Streaming 使用 streamingContext.queueStream(queueOfRDDs)方法可以创建基于 RDD 队列的 DStream,每个 RDD 队列将被视为 DStream 中的一块数据流进行加工处理。
高级来源
高级来源,如 Kafka、Flume、Kinesis、Twitter 等,可以通过额外的实用工具类来创建。高级来源需要外部 non-Spark 库的接口,其中一些有复杂的依赖关系(如 Kafka、Flume)。因此通过这些来源创建 DStreams 需要明确其依赖。
例如,如果想创建一个使用 Twitter tweets 的数据的 DStream 流,必须按以下步骤来做。
- 在 sbt 或 maven 工程里添加 spark-streaming-twitter_2.10 依赖。
- 开发:导入 TwitterUtils 包,通过 TwitterUtils.createStream 方法创建一个 DStream。
- 部署:添加所有依赖的 Jar 包,然后部署应用程序。
特别注意:
需要注意的是,这些高级来源一般在 Spark Shell 中不可用,因此基于这些高级来源的应用不能在 Spark Shell 中进行测试。如果必须在 Spark Shell 中使用它们,则需要下载相应的 maven 工程的 Jar 依赖并添加到类路径中。另外,输入 DStream 也可以创建自定义的数据源,需要做的就是实现一个用户定义的接收器。
可以在流处理程序中并行的接收多个数据流,即创建多个 Input DStreams。
这将创建同时接收多个数据流的多个 receivers(接收器)。但需要注意,一个Spark 的 worker/executor 是一个长期运行的任务(task),因此它将占用分配给Spark Streaming 的应用程序的所有核中的一个核(core)。
因此,要记住,一个Spark Streaming 应用需要分配足够的核(core)(或线程(threads),如果本地运行的话)来处理所接收的数据,以及来运行接收器(receiver(s))。
注意:
- 在本地运行 Spark Streaming 程序时,不要使用“local”或“local[1]”作为主URL。这两种方法都意味着只有一个线程将用于在本地运行任务。如果使用基于接收器的输入 DStream(例如 sockets、Kafka、Flume 等),那么将使用单个线程来运行接收器。因此,在本地运行时,始终使用“local[n]”作为主 URL,其中要运行 n 个接收方。
- 在集群上运行时,分配给 Spark Streaming 应用程序的内核数量必须大于接收器的数量。否则,系统将接收数据,但无法处理它。
基础数据源示例
文件流(File Streams)
import org.apache.spark.streaming._
val ssc =new StreamingContext(sc,Seconds(8))
ssc.textFileStream("/test/sparkstreaming/helloworld/").print
ssc.start
注 意 textFileStream() 参 数 必 须 是 文 件 目 录 , 但 可 以 支 持 通 配 符 如
“hdfs://namenode:8020/logs/2017/*”。
Spark 将监视该目录任务新建的文件,一旦有新文件才会处理。所有文件要求有相同的数据格式,并且监视文件的修改时间而不是创建时间,注意更新文件内容不会被监视,一旦开始处理,这些文件必须不能再更改。
因此如果文件被连续地追加,新的数据也不会被读取。文件流不需要运行接收器,因此,不需要分配内核。
Socket(TCP Socket)
在上一篇文章中已经举例过,在这里就不多说了。使用的就是Socket 数据源。
自定义数据源
自定义数据源即自定义 Receiver。自定义接收器必须通过实现两个方法来扩展 Receiver 抽象类。
- onStart():开始接收数据要做的事情。
- onStop():停止接收数据的操作。
onStart()和 onStop()不能无限阻塞。通常,onStart()将启动负责接收数据的线程,而 onStop()将确保这些接收数据的线程被停止。接收线程也可以使用isStopped()方法来检查它们是否应该停止接收数据。
一旦接收到数据,就可以通过调用 store(data)将数据存储在 Spark 中,这是Receiver 类提供的方法。store()有多种形式,可以一次存储接收到的数据记录,也可以作为对象/序列化字节的整个集合。注意,用于实现接收器的 store()的风格会影响其可靠性和容错语义。
接收线程中的任何异常都应该被捕获并正确处理,以避免接收方的无声故障。restart()将通过异步调用 onStop()和延迟后调用 onStart()来重新启动接收器。stop()将调用 onStop()并终止接收方。此外,reportError()在不停止/重新启动接收器的情况下向驱动程序报告错误消息(在日志和 UI 中可见)。
自定义 Socket 接收器示例:
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
//启动接收线程
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}
def onStop() {
}
/** 创建 Socket 连接并接收数据直到 receiver 停止*/
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// 连接到 host:port
socket = new Socket(host, port)
//读 Socket
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(),
StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput) //存储接收到的数据,抽象类已实现
userInput = reader.readLine()
}
reader.close()
socket.close()
// 重新启动,当服务器再次激活时尝试重新连接
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// 如果无法连接到服务器,重新启动
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// 如果有任何其他错误,重新启动
restart("Error receiving data", t)
}
}
}
//调用示例:
// ssc 为 StreamingContext
val customReceiverStream=ssc.receiverStream(new CustomReceiver(host, port))
val words = customReceiverStream.flatMap(_.split(" "))