【Spark】Spark常用方法总结2-RDD的使用(Python版本)

生成RDD

rdd1 = sc.parallelize([['zhangsan', 'M', 29], ['lisi', 'F', 22], ['wangwu', 'M', 30]])
rdd2 = sc.textFile(r'E:\常用基础数据\po.csv')

collect、collectAsMap、first

collect获取全部rdd元素, 生成list
collectAsMap获取键值对, 如果RDD中同一个Key中存在多个Value,后面的value将会把前面的value覆盖
first 获取第一个元素

rdd1_list = rdd1.collect()

sample

withReplacement 取出后是否放回
fraction 取出比例
seed 随机数, 避免同一个程序里面多个随机取值太过相似
返回值为一个新的rdd

rdd1_sample = rdd1.sample(False, 0.6, 1)
print(rdd1_sample.collect())
sort 排序

ascending 正序排序, 默认为正序
numPartitions 分区数, 不知道干嘛的
keyfunc 获取key, 默认为tuple或者list的第一个

rdd1_sorted = rdd1.sortByKey(ascending=False)
rdd1_sorted2 = rdd1.sortBy(ascending=False, keyfunc=lambda x: x[1])
print(rdd1_sorted.collect())
print(rdd1_sorted2.collect())

map、flatMap, mapValues ,flatMapValues

对每一个元素进行处理, 参数为rdd中的元素个体, flatMap将map扁平化flat()
preservesPartitioning 是否保留父rdd的partitioner分区信息, 不知所云

rdd1_map = rdd1.map(lambda x: x[2])
print(rdd1_map.collect())
rdd2 = sc.parallelize([
    [1, 'yingyu', 89],
    [1, 'shuxue', 99],
    [1, 'zhengzhi', 78],
    [2, 'shuxue', 78],
    [2, 'zhengzhi', 89],
    [3, 'jisuanji', 97],
    [3, 'huaxue', 67],
    [100, 'shengwu', 68]
])

当key-value类型的rdd, 对每一个key中的value进行操作. key不变. 可以配合groupby

print(rdd1.mapValues(lambda x: len(x)))

比起上面的多一个拆分动作

print(rdd1.flatMapValues(lambda x: len(x))

reduce、reduceByKey、fold、foldByKey

reduce 不解释
fold是提供了初始值的reduce

关联与join

cartesian笛卡尔积, rdd1中的单个元素作为整体分别和rdd2全关联
join 两边都有的

rdd1_rdd2_join = rdd1.join(rdd2)
print(rdd1_rdd2_join.collect())

outerJoin, 数据最全, 没有的就关联空

rdd1.fullOuterJoin(rdd2)

leftOuterJoin 左边的都有, 不管右边有没有

rdd1_rdd2_left_outer_join = rdd1.leftOuterJoin(rdd2)

rightOuterJoin 右边的都有

rdd1_rdd2_left_outer_join = rdd1.rightOuterJoin(rdd2)

示例

rdd1 = sc.parallelize([[1, 'A'], [2, 'B'], [3, 'C'], [4, 'D'], [5, 'E']])
rdd2 = sc.parallelize([[1, 'a'], [1, 'b'], [3, 'c'], [4, 'd'], [8, 'e']])

result1 = rdd1.cartesian(rdd2).collect()
result2 = rdd1.join(rdd2).collect()
result3 = rdd1.fullOuterJoin(rdd2).collect()
result4 = rdd1.leftOuterJoin(rdd2).collect()
result5 = rdd1.rightOuterJoin(rdd2).collect()

结果

[([1, 'A'], [1, 'a']), ([1, 'A'], [1, 'b']), ([1, 'A'], [3, 'c']), ([1, 'A'], [4, 'd']), ([1, 'A'], [8, 'e']), ([2, 'B'], [1, 'a']), ([2, 'B'], [1, 'b']), ([2, 'B'], [3, 'c']), ([2, 'B'], [4, 'd']), ([2, 'B'], [8, 'e']), ([3, 'C'], [1, 'a']), ([3, 'C'], [1, 'b']), ([3, 'C'], [3, 'c']), ([3, 'C'], [4, 'd']), ([3, 'C'], [8, 'e']), ([4, 'D'], [1, 'a']), ([4, 'D'], [1, 'b']), ([4, 'D'], [3, 'c']), ([4, 'D'], [4, 'd']), ([4, 'D'], [8, 'e']), ([5, 'E'], [1, 'a']), ([5, 'E'], [1, 'b']), ([5, 'E'], [3, 'c']), ([5, 'E'], [4, 'd']), ([5, 'E'], [8, 'e'])]
[(1, ('A', 'a')), (1, ('A', 'b')), (3, ('C', 'c')), (4, ('D', 'd'))]
[(1, ('A', 'a')), (1, ('A', 'b')), (2, ('B', None)), (3, ('C', 'c')), (4, ('D', 'd')), (5, ('E', None)), (8, (None, 'e'))]
[(1, ('A', 'a')), (1, ('A', 'b')), (2, ('B', None)), (3, ('C', 'c')), (4, ('D', 'd')), (5, ('E', None))]
[(1, ('A', 'a')), (1, ('A', 'b')), (3, ('C', 'c')), (4, ('D', 'd')), (8, (None, 'e'))]

sum

求和, 用之前检查是否全是数字类型

rdd1.map(lambda x: x[0]).sum()

计数

count 对元素个数进行计数

rdd1.count()

countApprox 不精确计数, 当超过timeout时, 返回一个未完成的结果。
print(rdd1.countApprox(timeout=50), 1)

countApproxDistinct 不精确计数, 参数是大概的误差指数 数据量*误差指数
print(rdd1.countApproxDistinct(0.01))

去重

去重, 实际上调用的是reduceByKey方法(做了一次WordCount)
print(rdd1.distinct().sortByKey().collect())

filter

取出满足参数的回调函数的数据, 生成一个新的RDD
print(rdd1.filter(lambda x: x[0] > 10))

聚合函数

def fun1(x1, x2) -> int:
    print('fun1 %d %d' % (x1, x2))
    return x1 + x2


def fun2(x1, x2) -> int:
    print('fun2 %d %d' % (x1, x2))
    return x1 + x2

aggregateByKey, aggregate

对于每一个分区每一个元素先执行fun1, 前面的执行结果作为后面的参数1
对于每一个分区, 把fun1执行的结果作为单个元素, 对于每一个分区计算结果先执行fun2, 前面的执行结果作为后面的参数1
例:

result1 = rdd1.aggregateByKey(zeroValue=100, seqFunc=fun1, combFunc=fun2)
结果
fun1 0 1
fun1 1 2
[Stage 1:===================> (1 + 1) / 3]
fun1 0 3
fun1 3 4
[Stage 1:===================> (2 + 1) / 3]
fun1 0 5
fun1 5 6
fun1 11 7
fun1 18 8
fun2 0 3
fun2 3 7
fun2 10 26

checkpoint、getCheckpointFile

‘’’
对于cache,若机器发生故障,内存或者磁盘中缓存的数据丢失时,就要根据lineage(血统)进行数据恢复,想象一下,如果在这之前有100个rdd,那么在要经过100次的转换,才能将数据恢复过来,这样效率非常低。
所以可以使用rdd的checkpoint机制(检查点,相当于快照),将你认为很重要的rdd存放到一个公共的高可用的存储系统中去,如hdfs,下次数据丢失时,就可以从前面ck的rdd直接进行数据恢复,而不需要根据lineage去从头一个一个的去恢复,这样极大地提高了效率。
首先要设置ck的存放的目录:
sc.setCheckpointDir(“hdfs://master:9000/rdd-checkpoint”) //使用hdfs做存储,如果文件目录不存在会创建一个新的
创建好后hdfs中会生成一个rdd-checkpoint目录,里面还会自动生成一个目录:
最好配合cache使用
‘’’

rdd1.cache().checkpoint()

cache、persist、getStorageLevel

cache 实际调用 self.persist(StorageLevel.MEMORY_ONLY)
getStorageLevel 返回RDD当前的存储级别,存储级别一旦确定,就不能再修改了。

StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY
StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2
StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2
rdd1.cache()
rdd1.persist(StorageLevel.MEMORY_AND_DISK)

对于RDD的partition的控制

‘’’
repartition这个方法可以增加或减少此RDD中的并行度。在内部,这使用shuffle来重新分配数据。
如果要减少RDD中的分区数量,请考虑使用“coalesce”,这样可以避免执行shuffle。
增加用repartition, 减少用coalesce
‘’’

rdd2 = rdd1.repartition(10)
print(rdd2.groupBy(lambda x: x[1][1]).reduceByKey(lambda x1, x2: x1).collect())
rdd2.collect()
print(rdd2.getNumPartitions())
rdd3 = rdd2.coalesce(4)
rdd3.collect()
print(rdd3.getNumPartitions())

自定义分区

partitionBy

cogroup, groupWith

cogroup :先各自把相同key的value凑在一起 再把两个rdd的结果以key拼在一起
groupWith: cogroup只支持两个RDD的操作, groupWith支持多个RDD
经常用来统计某个东西在哪些地方出现过 例如
items.cogroup(sale_records) # 某商品哪些人购买过

rdd1 = sc.parallelize(range(1, 10), 1).map(lambda x: (x, chr(ord('A') + x)))
rdd2 = sc.parallelize([random.randint(3, 12) for i in range(1, 10)], 1).map(lambda x: (x, chr(ord('a') + x)))
result = rdd1.cogroup(rdd2, 1)

采用pandas的DataFrame可以方便地打印数据 不用生成价格昂贵的spark的DataFrame

print(DataFrame(rdd1.collect()))
print(DataFrame(rdd2.collect()))
print(DataFrame(rdd3.collect()))
====RDD1====
   0  1
0  1  B
1  2  C
2  3  D
3  4  E
4  5  F
5  6  G
6  7  H
7  8  I
8  9  J
====RDD2====
    0  1
0  12  m
1  12  m
2   4  e
3   8  i
4   3  d
5   5  f
6  12  m
7   4  e
8   7  h
====Result====
    0                1
0   1        ((B), ())
1   2        ((C), ())
2   3       ((D), (d))
3   4    ((E), (e, e))
4   5       ((F), (f))
5   6        ((G), ())
6   7       ((H), (h))
7   8       ((I), (i))
8   9        ((J), ())
9  12  ((), (m, m, m))

glom 按分区打包元素

rdd1.glom().collect()

[[(8, 'e'), (9, 'e'), (2, 'd'), (1, 'd'), (1, 'f'), (2, 'b')], 
[(5, 'f'), (10, 'b'), (8, 'd'), (9, 'f'), (4, 'b'), (2, 'e')], 
[(1, 'd'), (10, 'b'), (7, 'd'), (4, 'f'), (8, 'd'), (4, 'b'), (6, 'd')]]

交集、差集、并集intersection, subtract, union

intersection 两边都有的
subtract rdd1剔除rdd2中有的
union 所有

rdd1 = sc.parallelize([random.randint(1, 10) for x in range(1, 10)], 1)
rdd2 = sc.parallelize([random.randint(1, 10) for x in range(1, 10)], 1)

print(rdd1.collect())
print(rdd2.collect())

print(rdd1.intersection(rdd2).collect())
print(rdd1.subtract(rdd2).collect())
print(rdd1.union(rdd2).collect())

结果

rdd1: [3, 3, 3, 7, 4, 7, 2, 9, 8]
rdd2: [8, 1, 10, 10, 9, 5, 4, 6, 7]

intersection: [4, 8, 7, 9]
subtract: [2, 3, 3, 3]
union: [3, 3, 3, 7, 4, 7, 2, 9, 8, 8, 1, 10, 10, 9, 5, 4, 6, 7]

RDD为空
rdd1.isEmpty()
rdd1.isLocallyCheckpointed()

用现有的数据生成key, 等价于 self.map(lambda x: (f(x), x))
print(rdd1.keyBy(f=lambda x: x + 1).collect())

获取所有key
rdd1.keys()

在这里插入图片描述

发布了6 篇原创文章 · 获赞 0 · 访问量 157

猜你喜欢

转载自blog.csdn.net/refbit/article/details/104109062