spark读书笔记(2)-键值对

键值对RDD 通常用来进行聚合计算。我们一般要先通过一些初始ETL(抽取、转化、装载)操作来将数据转化为键值对形式。

1.创建Pair RDD

Spark 为包含键值对类型的RDD 提供了一些专有的操作,称之为Pair RDD。它们提供了并行操作各个键或跨节点重新进行数据分组的操作接口。
python使用第一个单词作为键创建出一个pair RDD

pairs = lines.map(lambda x: (x.split(" ")[0], x))

当用Python 从一个内存中的数据集创建pair RDD 时,只需要对这个由二元组组成的集合调用SparkContext.parallelize() 方法。

2.Pair RDD的转化操作

拿上中的pair RDD,筛选掉长度超过20 个字符的行,

result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20)

有时,我们只想访问pair RDD 的值部分,这时操作二元组很麻烦。由于这是一种常见的使用模式,因此Spark 提供了mapValues(func) 函数,功能类似于map{case (x, y): (x,func(y))}。可以在很多例子中使用这个函数。

聚合操作

pair RDD上有相应的针对键的转化操作,可以组合具有相同键的值。这些
操作返回RDD,因此它们是转化操作而不是行动操作。
reduceByKey(),接收函数,并根据函数对值进行合并。对数据的每个键进行并行归约操作,返回RDD。
在Python 中使用reduceByKey() 和mapValues() 计算每个键对应的平均值

rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

调用reduceByKey() 和foldByKey() 会在为每个键计算全局的总结果之前先自动在每台机器上进行本地合并。
用Python 实现单词计数

rdd = sc.textFile("s3://...")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

事实上,我们可以对第一个RDD 使用countByValue() 函数,以更快地实现单词计数:input.flatMap(x => x.split(” “)).countByValue()。
combineByKey()基于键进行聚合的函数。
实现过程:遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作createCombiner() 的函数来创建那个键对应的累加器的初始值。(这一过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD 中第一次出现一个键时发生)。如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并。最后使用mergeCombiners() 方法将各个分区的结果进行合并。
在Python 中使用combineByKey() 求每个键对应的平均值

sumCount = nums.combineByKey((lambda x: (x,1)),
(lambda x, y: (x[0] + y, x[1] + 1)),
(lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

理解:第一行参数,createCombiner() 。x[0]保存value,x[1]保存个数,新键个数记 1。第二行,mergeValue(),x[0]累加,x[1]加一。第三行,mergeCombiners() 接收两个分区键值对,value与个数分别求和。

并行度调优

猜你喜欢

转载自blog.csdn.net/sky_noodle/article/details/81485934