Spark Streaming例子

Spark Streaming 概要

在这里插入图片描述
在内部,其按如下方式运行。Spark Streaming接收到实时数据流同时将其划分为分批,这些数据的分批将会被Spark的引擎所处理从而生成同样按批次形式的最终流。
在这里插入图片描述

例子

在深入了解如何编写你自己的SS程序之前,让我们先迅速浏览下基本的SS程序是什么样的。假设我们想统计文本数据中单词个数(数据来自于监听一个TCP接口的数据服务器)。你只需要这样做:
第一步,加载入StreamingContext,这个是所有流功能函数的主要访问点,我们使用两个执行线程和1s的批次间隔来创建本地的StreamingContext:

maven

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spark.version>2.2.1</spark.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.11</artifactId>
      <version>1.6.2</version>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
    </dependency>


  </dependencies>
package com.hx.test

/**
 * fileName : Test11StreamingWordCount 
 * Created by 970655147 on 2016-02-12 13:21.
 */
object Test11StreamingWordCount {

  // 基于sparkStreaming的wordCount
  // 环境windows7 + spark1.2 + jdk1.7 + scala2.10.4
  // 1\. 启动netcat [nc -l -p 9999]
  // 2\. 启动当前程序
  // 3\. netcat命令行中输入数据
  // 4\. 回到console, 查看结果[10s 之内]
  // *******************************************
  // 每一个print() 打印一次
  // -------------------------------------------
  // Time: 1455278620000 ms
  // -------------------------------------------
  // Another Infomation !
  // *******************************************
  // inputText : sdf sdf lkj lkj lkj lkj
  // MappedRDD[23] at count at Test11StreamingWordCount.scala:39
  // 2
  // (sdf,2), (lkj,4)
  def main(args :Array[String]) = {

    // Create a StreamingContext with a local master
    // Spark Streaming needs at least two working thread
    val sc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10) )
    // Create a DStream that will connect to serverIP:serverPort, like localhost:9999
    val lines = sc.socketTextStream("192.168.47.141", 9999)
     // Split each line into words
    // 以空格把收到的每一行数据分割成单词
    val words = lines.flatMap(_.split(" "))
    // 在本批次内计单词的数目
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    // 打印每个RDD中的前10个元素到控制台
    wordCounts.print()

    sc.start()
    sc.awaitTermination()

  }

}

words DStream 进一步被映射成(一对一的转换)(word,1)对的DStream形式,这个“对”形式的DStream将会被reduced(一个Spark操作)以取得数据各个批次中单词的统计。最后,wordCounts.print()打印处每一秒所获得的少量计数值。

注意:这么多行代码被执行后,SS仅仅设置了其若开始运行将要进行的运算,但是并没有开始真正意义上的处理。在所有的转换都部署完毕后,我们需要调用下面两个操作来真正启动处理:

ssc.start() # 开始计算
ssc.awaitTermination() # 等待计算终止

如果你已经下载并编译了Spark,就可以按如下讲解来运行这个例子。首先你需要运行Netcat(大多数类Unix系统都有的工具)作为数据服务器:

$ nc -l 9999

运行netcat终端上的任何键入的行将会被计算并打印到屏幕上。

# TERMINAL 1:
# Running Netcat
$ nc -l 9999
hello world
# TERMINAL 2: RUNNING network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
(hello,1)
(world,1)

在这里插入图片描述
这些底层的RDD转换是通过Spark引擎计算的。DStream操作隐藏了大多数细节,同时为了方便为开发者提供了一个高层的API。这里的一些操作会在下文中详述

发布了79 篇原创文章 · 获赞 8 · 访问量 2万+

猜你喜欢

转载自blog.csdn.net/qq_34219959/article/details/102807169