scala-wordcount详解

 用scala语言写wordcount,主要也就那么几个流程,和mapreduce的思想步骤还是差不多的。下面我们来详细讲解一下:先看一下代码

import org.apache.spark.{SparkConf, SparkContext}

/**
  * @Date: 2018/12/17 9:03
  */
object wc_scala {

  def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setAppName("wc_scala")/*.setMaster("local")*/;

      val sc = new SparkContext(conf);
      
      val lines = sc.textFile("hdfs://192.168.33.100:9000/spark.txt",1)

      val words = lines.flatMap{line => line.split(" ")};
      val pairs = words.map{word => (word,1)}
      val wordcounts = pairs.reduceByKey(_+_);

      wordcounts.foreach(wordcount => println(wordcount._1+ " : "+wordcount._2+" times"))
  }
}

 

 val conf = new SparkConf().setAppName("wc_scala")/*.setMaster("local")*/;

 首先看第一行,创建SparkConf对象,设置Spark应用配置信息,setAppName是设置你运行程序的名字,使用setMaster设置Spark程序连接的集群的master,local代表本地(我注释掉了)

val sc = new SparkContext(conf);

创建JavaSparkContext对象 spark中所有功能的入口都是SparkContext,它的工作:初始化spark程序所需要的一些核心组件(调度器等),还会到spark的master上进行注册 用scala-->SparkContext,Java-->JavaSparkContext等

val lines = sc.textFile("hdfs://192.168.33.100:9000/spark.txt",1)

针对输入源(hdfs文件、本地文件),例如txt文件中有三行“hello you”和三行“hello me”--->如图第1步

创建出RDD(RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集), 输入源中的数据会被打散,分配到RDD的各个partition,形成初始的分布式数据集.RDD有元素概念,对于HDFS和本地文件,创建的RDD,每一个元素相当于文本的一行  

val words = lines.flatMap{line => line.split(" ")};

对每行的元素进行切分,分成单个单词--->第2步

对初始RDD进行transformation操作 创建Function,并配合RDD的map,flatmap等算子操作。 function,简单的话用匿名内部类,复杂的话会创建一个类作为实现function的接口类 先把每行元素拆分成单个单词 FlatMapFunction,两个泛型参数,代表输入输出类型

val pairs = words.map{word => (word,1)}

将每个word映射成(word,1)--->第3步

//mapToPair-->每个元素映射成(v1,v2)的Tuple.这个算子配合partition使用,三个参数-->输入类型,tuple的两个值类型 //JavaPairRDD的泛型参数代表tuple元素的第一个和第二个值的类型

val wordcounts = pairs.reduceByKey(_+_);

通过pairs的key值,把value值相加,这中间经过了spark的shuffle过程

//统计单词出现次数--->reduceByKey,对每个key对应的value进行reduce

//(hello, 1) (hello, 1) (hello, 1) (world, 1)-->相当于是,首先是1 + 1 = 2,然后再将2 + 1 = 3

 wordcounts.foreach(wordcount => println(wordcount._1+ " : "+wordcount._2+" times"))

对于flatMap,mapToPair,reduceByKey等都是transformation操作,必须有action触发执行操作

好了,spark中用scala语法写wordcount的程序就写好了

猜你喜欢

转载自blog.csdn.net/S_Running_snail/article/details/85055799