pyspark: basic-operating_0

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u014281392/article/details/89028273

add-indices

from pyspark import SparkConf, SparkContext
sc = SparkContext()
a = {('g1', 2), ('g2', 4), ('g3', 3), ('g4', 8)}
a
{('g1', 2), ('g2', 4), ('g3', 3), ('g4', 8)}
# rdd object
rdd = sc.parallelize(a)
rdd.collect()
[('g4', 8), ('g3', 3), ('g1', 2), ('g2', 4)]
# sort by key
sorted_rdd = rdd.sortByKey()
sorted_rdd.collect()
[('g1', 2), ('g2', 4), ('g3', 3), ('g4', 8)]
# k, v---> v, k : Error
rdd2 = rdd.map(lambda (x,y) : (y,x))
rdd2.collect()
sorted_rdd = rdd.sortByKey(False)
sorted_rdd.collect()
[('g4', 8), ('g3', 3), ('g2', 4), ('g1', 2)]
indices_rdd = sorted_rdd.zipWithIndex()
indices_rdd.collect()
[(('g4', 8), 0), (('g3', 3), 1), (('g2', 4), 2), (('g1', 2), 3)]

basic-average

nums = sc.parallelize([1,2,3,4,5,6,7,8,9,0])
nums.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 0]
numAndcount = nums.map(lambda x : (x, 1)).fold((0, 0), (lambda x,y : (x[0] + y[0], x[1] + y[1])))
num_avg = float(numAndcount[0]/numAndcount[1])
num_avg
4.5
nums.map(lambda x : (x, 1)).collect()
[(1, 1),
 (2, 1),
 (3, 1),
 (4, 1),
 (5, 1),
 (6, 1),
 (7, 1),
 (8, 1),
 (9, 1),
 (0, 1)]
nums.map(lambda x : (x, 1)).fold((0, 0), (lambda x,y : (x[0] + y[0], x[1] + y[1])))             # x, y : (x[0], x[1]), (y[0], y[1])
(45, 10)

basic-filter

nums = sc.parallelize([1,3,4,5,6,7,8, 2, 6, 7, 23, 35])
nums.collect()
[1, 3, 4, 5, 6, 7, 8, 2, 6, 7, 23, 35]
# odd 
odds = nums.filter(lambda x : x%2 == 1)
odds.collect()
[1, 3, 5, 7, 7, 23, 35]
evens = nums.filter(lambda x : x%2 == 0)
evens.collect()
[4, 6, 8, 2, 6]

basic-join

R_txt = sc.textFile('./R.txt')
R_txt.collect()
['k1,v1', 'k1,v2', 'k2,v3', 'k2,v4', 'k3,v7', 'k3,v8', 'k3,v9']
S_txt = sc.textFile('./S.txt')
S_txt.collect()
['k1,v11', 'k1,v22', 'k1,v33', 'k2,v55', 'k4,v77', 'k5,v88']
r1 = R_txt.map(lambda x : x.split(','))
r1.collect()
[['k1', 'v1'],
 ['k1', 'v2'],
 ['k2', 'v3'],
 ['k2', 'v4'],
 ['k3', 'v7'],
 ['k3', 'v8'],
 ['k3', 'v9']]
r2 = r1.flatMap(lambda x : [(x[0], x[1])])
r2.collect()
[('k1', 'v1'),
 ('k1', 'v2'),
 ('k2', 'v3'),
 ('k2', 'v4'),
 ('k3', 'v7'),
 ('k3', 'v8'),
 ('k3', 'v9')]
s1 = S_txt.map(lambda x : x.split(','))
s1.collect()
[['k1', 'v11'],
 ['k1', 'v22'],
 ['k1', 'v33'],
 ['k2', 'v55'],
 ['k4', 'v77'],
 ['k5', 'v88']]
s2 = s1.flatMap(lambda x : [(x[0], x[1])])
s2.collect()
[('k1', 'v11'),
 ('k1', 'v22'),
 ('k1', 'v33'),
 ('k2', 'v55'),
 ('k4', 'v77'),
 ('k5', 'v88')]
RjoinedS = r2.join(s2)
RjoinedS.collect()
[('k1', ('v1', 'v11')),
 ('k1', ('v1', 'v22')),
 ('k1', ('v1', 'v33')),
 ('k1', ('v2', 'v11')),
 ('k1', ('v2', 'v22')),
 ('k1', ('v2', 'v33')),
 ('k2', ('v3', 'v55')),
 ('k2', ('v4', 'v55'))]
left_joined = r2.leftOuterJoin(s2)
left_joined.collect()
[('k1', ('v1', 'v11')),
 ('k1', ('v1', 'v22')),
 ('k1', ('v1', 'v33')),
 ('k1', ('v2', 'v11')),
 ('k1', ('v2', 'v22')),
 ('k1', ('v2', 'v33')),
 ('k2', ('v3', 'v55')),
 ('k2', ('v4', 'v55')),
 ('k3', ('v7', None)),
 ('k3', ('v8', None)),
 ('k3', ('v9', None))]
right_joined = r2.rightOuterJoin(s2)
right_joined.collect()
[('k1', ('v1', 'v11')),
 ('k1', ('v1', 'v22')),
 ('k1', ('v1', 'v33')),
 ('k1', ('v2', 'v11')),
 ('k1', ('v2', 'v22')),
 ('k1', ('v2', 'v33')),
 ('k2', ('v3', 'v55')),
 ('k2', ('v4', 'v55')),
 ('k4', (None, 'v77')),
 ('k5', (None, 'v88'))]

猜你喜欢

转载自blog.csdn.net/u014281392/article/details/89028273