一:准备sparkStreamingWordFrep.txt文本文件,内容如下:
this is a processing of the sparkStreaming data learn use I can process spark it big streming
data learn use I can process spark it big streming
to want I can data learn use I can process spark it big streming
二,搭建maven管理的工程
<properties>
<scala.version>2.11.8</scala.version>
</properties>
<repositories>
<repository>
<id>repos</id>
<name>Repository</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>repos</id>
<name>Repository</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</pluginRepository>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency> <!-- Spark ,依赖的Scala版本为Scala 2.12.x版本 (View all targets) 否则会报错
java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object; -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency> <!-- Spark -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<!-- 2.12.x需要与spark的2.12对应-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.jy.data.yh.bigdata.drools.scala.sparkstreaming.SparkStreamingWordFreq</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>org.jy.data.yh.bigdata.drools.scala.sparkstreaming.SparkStreamingWordFreq</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
三:完整代码如下
package org.jy.data.yh.bigdata.drools.scala.sparkstreaming
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark 文本单词的计数: 从文本中读取数据,利用空格进行数据切分后,按照<词,1>的key,value映射后,
* 最终以key为reduce对象,整合每个单词出现的次数。
*
* 利用flagMap以空格为切分将输入的文本映射为一个向量,之后用map将每个元素映射为(元素,词频)
* 的二元组,最后以每个词元素为key 进行reduce合并操作,从而统计出每个单词出现的词频
* (注:这一步是分散在集群当中的所有Worker中执行的,即每个Worker可能只计算了一小部分)
*
*/
object SparkStreamingWordFreq {
def main(args :Array[String]):Unit = {
// 创建SparkConf对象
val sparkConf = new SparkConf();
sparkConf.setAppName("SparkStreamingWordFreq") // 设置应用名称,该名称在Spark Web Ui中显示
sparkConf.setMaster("local[*]") // 设置本地模式
// 创建SparkContext对象
val sparkContext = new SparkContext(sparkConf);
// 数据源为文本文件
val txtFile = "D://jar/sparkStreamingWordFrep.txt"
// 读取文本文件的内容
val txtData = sparkContext.textFile(txtFile);
// 缓存文本RDD
txtData.cache()
// 计数
txtData.count()
// 以空格分割进行词频统计
val wcData = txtData.flatMap{line => line.split(" ")}
.map(word => (word,1))
.reduceByKey(_+_)
// 汇总RDD信息并打印
wcData.collect() // 返回一个数组
.foreach(e=>{println(e)})
sparkContext.stop()
}
}
四,在IntelliJ IDEA中运行该程序,输出统计结果如下:
(learn,3)
(this,1)
(is,1)
(can,4)
(big,3)
(data,3)
(,1)
(want,1)
(it,3)
(spark,3)
(process,3)
(a,1)
(streming,3)
(processing,1)
(sparkStreaming,1)
(I,4)
(to,1)
(of,1)
(use,3)
(the,1)