生成RDD
rdd1 = sc.parallelize([['zhangsan', 'M', 29], ['lisi', 'F', 22], ['wangwu', 'M', 30]])
rdd2 = sc.textFile(r'E:\常用基础数据\po.csv')
collect、collectAsMap、first
collect获取全部rdd元素, 生成list
collectAsMap获取键值对, 如果RDD中同一个Key中存在多个Value,后面的value将会把前面的value覆盖
first 获取第一个元素
rdd1_list = rdd1.collect()
sample
withReplacement 取出后是否放回
fraction 取出比例
seed 随机数, 避免同一个程序里面多个随机取值太过相似
返回值为一个新的rdd
rdd1_sample = rdd1.sample(False, 0.6, 1)
print(rdd1_sample.collect())
sort 排序
ascending 正序排序, 默认为正序
numPartitions 分区数, 不知道干嘛的
keyfunc 获取key, 默认为tuple或者list的第一个
rdd1_sorted = rdd1.sortByKey(ascending=False)
rdd1_sorted2 = rdd1.sortBy(ascending=False, keyfunc=lambda x: x[1])
print(rdd1_sorted.collect())
print(rdd1_sorted2.collect())
map、flatMap, mapValues ,flatMapValues
对每一个元素进行处理, 参数为rdd中的元素个体, flatMap将map扁平化flat()
preservesPartitioning 是否保留父rdd的partitioner分区信息, 不知所云
rdd1_map = rdd1.map(lambda x: x[2])
print(rdd1_map.collect())
rdd2 = sc.parallelize([
[1, 'yingyu', 89],
[1, 'shuxue', 99],
[1, 'zhengzhi', 78],
[2, 'shuxue', 78],
[2, 'zhengzhi', 89],
[3, 'jisuanji', 97],
[3, 'huaxue', 67],
[100, 'shengwu', 68]
])
当key-value类型的rdd, 对每一个key中的value进行操作. key不变. 可以配合groupby
print(rdd1.mapValues(lambda x: len(x)))
比起上面的多一个拆分动作
print(rdd1.flatMapValues(lambda x: len(x))
reduce、reduceByKey、fold、foldByKey
reduce 不解释
fold是提供了初始值的reduce
关联与join
cartesian笛卡尔积, rdd1中的单个元素作为整体分别和rdd2全关联
join 两边都有的
rdd1_rdd2_join = rdd1.join(rdd2)
print(rdd1_rdd2_join.collect())
outerJoin, 数据最全, 没有的就关联空
rdd1.fullOuterJoin(rdd2)
leftOuterJoin 左边的都有, 不管右边有没有
rdd1_rdd2_left_outer_join = rdd1.leftOuterJoin(rdd2)
rightOuterJoin 右边的都有
rdd1_rdd2_left_outer_join = rdd1.rightOuterJoin(rdd2)
示例
rdd1 = sc.parallelize([[1, 'A'], [2, 'B'], [3, 'C'], [4, 'D'], [5, 'E']])
rdd2 = sc.parallelize([[1, 'a'], [1, 'b'], [3, 'c'], [4, 'd'], [8, 'e']])
result1 = rdd1.cartesian(rdd2).collect()
result2 = rdd1.join(rdd2).collect()
result3 = rdd1.fullOuterJoin(rdd2).collect()
result4 = rdd1.leftOuterJoin(rdd2).collect()
result5 = rdd1.rightOuterJoin(rdd2).collect()
结果
[([1, 'A'], [1, 'a']), ([1, 'A'], [1, 'b']), ([1, 'A'], [3, 'c']), ([1, 'A'], [4, 'd']), ([1, 'A'], [8, 'e']), ([2, 'B'], [1, 'a']), ([2, 'B'], [1, 'b']), ([2, 'B'], [3, 'c']), ([2, 'B'], [4, 'd']), ([2, 'B'], [8, 'e']), ([3, 'C'], [1, 'a']), ([3, 'C'], [1, 'b']), ([3, 'C'], [3, 'c']), ([3, 'C'], [4, 'd']), ([3, 'C'], [8, 'e']), ([4, 'D'], [1, 'a']), ([4, 'D'], [1, 'b']), ([4, 'D'], [3, 'c']), ([4, 'D'], [4, 'd']), ([4, 'D'], [8, 'e']), ([5, 'E'], [1, 'a']), ([5, 'E'], [1, 'b']), ([5, 'E'], [3, 'c']), ([5, 'E'], [4, 'd']), ([5, 'E'], [8, 'e'])]
[(1, ('A', 'a')), (1, ('A', 'b')), (3, ('C', 'c')), (4, ('D', 'd'))]
[(1, ('A', 'a')), (1, ('A', 'b')), (2, ('B', None)), (3, ('C', 'c')), (4, ('D', 'd')), (5, ('E', None)), (8, (None, 'e'))]
[(1, ('A', 'a')), (1, ('A', 'b')), (2, ('B', None)), (3, ('C', 'c')), (4, ('D', 'd')), (5, ('E', None))]
[(1, ('A', 'a')), (1, ('A', 'b')), (3, ('C', 'c')), (4, ('D', 'd')), (8, (None, 'e'))]
sum
求和, 用之前检查是否全是数字类型
rdd1.map(lambda x: x[0]).sum()
计数
count 对元素个数进行计数
rdd1.count()
countApprox 不精确计数, 当超过timeout时, 返回一个未完成的结果。
print(rdd1.countApprox(timeout=50), 1)
countApproxDistinct 不精确计数, 参数是大概的误差指数 数据量*误差指数
print(rdd1.countApproxDistinct(0.01))
去重
去重, 实际上调用的是reduceByKey方法(做了一次WordCount)
print(rdd1.distinct().sortByKey().collect())
filter
取出满足参数的回调函数的数据, 生成一个新的RDD
print(rdd1.filter(lambda x: x[0] > 10))
聚合函数
def fun1(x1, x2) -> int:
print('fun1 %d %d' % (x1, x2))
return x1 + x2
def fun2(x1, x2) -> int:
print('fun2 %d %d' % (x1, x2))
return x1 + x2
aggregateByKey, aggregate
对于每一个分区每一个元素先执行fun1, 前面的执行结果作为后面的参数1
对于每一个分区, 把fun1执行的结果作为单个元素, 对于每一个分区计算结果先执行fun2, 前面的执行结果作为后面的参数1
例:
result1 = rdd1.aggregateByKey(zeroValue=100, seqFunc=fun1, combFunc=fun2)
结果
fun1 0 1
fun1 1 2
[Stage 1:===================> (1 + 1) / 3]
fun1 0 3
fun1 3 4
[Stage 1:===================> (2 + 1) / 3]
fun1 0 5
fun1 5 6
fun1 11 7
fun1 18 8
fun2 0 3
fun2 3 7
fun2 10 26
checkpoint、getCheckpointFile
‘’’
对于cache,若机器发生故障,内存或者磁盘中缓存的数据丢失时,就要根据lineage(血统)进行数据恢复,想象一下,如果在这之前有100个rdd,那么在要经过100次的转换,才能将数据恢复过来,这样效率非常低。
所以可以使用rdd的checkpoint机制(检查点,相当于快照),将你认为很重要的rdd存放到一个公共的高可用的存储系统中去,如hdfs,下次数据丢失时,就可以从前面ck的rdd直接进行数据恢复,而不需要根据lineage去从头一个一个的去恢复,这样极大地提高了效率。
首先要设置ck的存放的目录:
sc.setCheckpointDir(“hdfs://master:9000/rdd-checkpoint”) //使用hdfs做存储,如果文件目录不存在会创建一个新的
创建好后hdfs中会生成一个rdd-checkpoint目录,里面还会自动生成一个目录:
最好配合cache使用
‘’’
rdd1.cache().checkpoint()
cache、persist、getStorageLevel
cache 实际调用 self.persist(StorageLevel.MEMORY_ONLY)
getStorageLevel 返回RDD当前的存储级别,存储级别一旦确定,就不能再修改了。
StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY
StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2
StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2
rdd1.cache()
rdd1.persist(StorageLevel.MEMORY_AND_DISK)
对于RDD的partition的控制
‘’’
repartition这个方法可以增加或减少此RDD中的并行度。在内部,这使用shuffle来重新分配数据。
如果要减少RDD中的分区数量,请考虑使用“coalesce”,这样可以避免执行shuffle。
增加用repartition, 减少用coalesce
‘’’
rdd2 = rdd1.repartition(10)
print(rdd2.groupBy(lambda x: x[1][1]).reduceByKey(lambda x1, x2: x1).collect())
rdd2.collect()
print(rdd2.getNumPartitions())
rdd3 = rdd2.coalesce(4)
rdd3.collect()
print(rdd3.getNumPartitions())
自定义分区
partitionBy
cogroup, groupWith
cogroup :先各自把相同key的value凑在一起 再把两个rdd的结果以key拼在一起
groupWith: cogroup只支持两个RDD的操作, groupWith支持多个RDD
经常用来统计某个东西在哪些地方出现过 例如
items.cogroup(sale_records) # 某商品哪些人购买过
rdd1 = sc.parallelize(range(1, 10), 1).map(lambda x: (x, chr(ord('A') + x)))
rdd2 = sc.parallelize([random.randint(3, 12) for i in range(1, 10)], 1).map(lambda x: (x, chr(ord('a') + x)))
result = rdd1.cogroup(rdd2, 1)
采用pandas的DataFrame可以方便地打印数据 不用生成价格昂贵的spark的DataFrame
print(DataFrame(rdd1.collect()))
print(DataFrame(rdd2.collect()))
print(DataFrame(rdd3.collect()))
====RDD1====
0 1
0 1 B
1 2 C
2 3 D
3 4 E
4 5 F
5 6 G
6 7 H
7 8 I
8 9 J
====RDD2====
0 1
0 12 m
1 12 m
2 4 e
3 8 i
4 3 d
5 5 f
6 12 m
7 4 e
8 7 h
====Result====
0 1
0 1 ((B), ())
1 2 ((C), ())
2 3 ((D), (d))
3 4 ((E), (e, e))
4 5 ((F), (f))
5 6 ((G), ())
6 7 ((H), (h))
7 8 ((I), (i))
8 9 ((J), ())
9 12 ((), (m, m, m))
glom 按分区打包元素
rdd1.glom().collect()
[[(8, 'e'), (9, 'e'), (2, 'd'), (1, 'd'), (1, 'f'), (2, 'b')],
[(5, 'f'), (10, 'b'), (8, 'd'), (9, 'f'), (4, 'b'), (2, 'e')],
[(1, 'd'), (10, 'b'), (7, 'd'), (4, 'f'), (8, 'd'), (4, 'b'), (6, 'd')]]
交集、差集、并集intersection, subtract, union
intersection 两边都有的
subtract rdd1剔除rdd2中有的
union 所有
rdd1 = sc.parallelize([random.randint(1, 10) for x in range(1, 10)], 1)
rdd2 = sc.parallelize([random.randint(1, 10) for x in range(1, 10)], 1)
print(rdd1.collect())
print(rdd2.collect())
print(rdd1.intersection(rdd2).collect())
print(rdd1.subtract(rdd2).collect())
print(rdd1.union(rdd2).collect())
结果
rdd1: [3, 3, 3, 7, 4, 7, 2, 9, 8]
rdd2: [8, 1, 10, 10, 9, 5, 4, 6, 7]
intersection: [4, 8, 7, 9]
subtract: [2, 3, 3, 3]
union: [3, 3, 3, 7, 4, 7, 2, 9, 8, 8, 1, 10, 10, 9, 5, 4, 6, 7]
RDD为空
rdd1.isEmpty()
rdd1.isLocallyCheckpointed()
用现有的数据生成key, 等价于 self.map(lambda x: (f(x), x))
print(rdd1.keyBy(f=lambda x: x + 1).collect())
获取所有key
rdd1.keys()