【Spark】RDD

    小落用的是pyspark,利用jupyter来编写提交spark job。下面直接用代码介绍:
在此之前要已经搭建并启动hdfs+spark+jupyter

启动spark api

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("spark://192.168.48.100:7077")\
				.appName("rdd_demos").getOrCreate()
				

创建RDD

# 通过并行化内存中的集合来创建RDD
arr1 = [1,2,3,4,5,6,7,8,9,10]
rdd1 = spark.sparkContext.parallelize(arr1)
#加载外部文件创建RDD
file = "/spark_demo/wordcount/input/study.txt"
rdd3 = spark.sparkContext.textFile(file)

对RDD执行transformation操作

# 查看当前RDD有几个分区
rdd1.getNumPartitions()
# 把rdd看作是流式的
#—— map(自定义规则)转换,一对一
data_rdd1 = data.map(lambda x: x+1)
#—— flatMap转换:先规则转换,再扁平,一对多
data_rdd2 = data.flatMap(lambda x: range(x,4))
#—— filter转换,多对一
data_rdd3 = data.filter(lambda x: x!=1)
#—— distinct转换,多对一
data_rdd4 = data.distinct()
# ——sample:抽样,多对一
data_rdd5 = data.sample(False, 0.5)

对RDD执行set操作

# 现在假设有两个RDD,包含元素分别为{1,2,3,3}和{3,4,5}
# 首先构造这两个RDD
data1 = spark.sparkContext.parallelize([1,2,3,3])
data2 = spark.sparkContext.parallelize([3,4,5])
# union操作
data1.union(data2).collect()
# intersection操作:交集
data1.intersection(data2).collect()
# substact操作:差集
data1.subtract(data2).collect()
# cartesian操作:笛卡尔集
data1.cartesian(data2).collect()
# groupBy操作:按指定规则分组
a = spark.sparkContext.parallelize(["black", "blue", "white", "green", "grey"])
b = a.groupBy(lambda x: len(x)).collect()
print(b)
sorted([(x,sorted(y)) for (x,y) in b])

对RDD执行action操作

rdd.count()
rdd.collect()
rdd.first()
rdd.countByValue()
rdd.take(2)
rdd.takeOrdered(2)
rdd.takeOrdered(2, key=lambda x: -x)
rdd.takeSample(False, 2)
rdd.reduce(lambda x,y: x+y)
rdd.getNumPartitions()

复杂算子——aggregate

seqOp = (lambda x,y: x * y)    # 每个分区执行的函数
combOp = (lambda x,y: x + y)   # 各个分区结果最后聚集时使用的函数
2 #指定是初始值及输出形式
result = rdd.aggregate(2, seqOp, combOp)
result

创建Pair RDD

# 创建Pair RDD的方式有多种。

# 第一种创建方式:从文件中加载,然后转换为Pair RDD。
file = "/spark_demo/wordcount/input/study.txt"
lines =spark.sparkContext.textFile(file)
pairRDD = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1))
pairRDD.collect()

# 第二种方式:通过并行集合创建Pair RDD
rdd = spark.sparkContext.parallelize(["Hadoop","Spark","Hive","Spark"])
pairRDD = rdd.map(lambda word: (word,1))
pairRDD.collect()
# keyBy():自定义key的分组规则
a = spark.sparkContext.parallelize(["black", "blue", "white", "green", "grey"])

# 通过应用指定的函数来创建该RDD中元素的元组(参数函数生成对应的key),返回一个pair RDD
b = a.keyBy(lambda x:len(x))  
b.collect()

# 或者,通过元组列表创建 pair rdd
pets = spark.sparkContext.parallelize([("cat",1),("dog",1),("cat",2)])
pets.collect()

Pair RDD transformation操作

# reduceByKey(func):按照key来合并值(相同key的值进行合并)
pairRDD.reduceByKey(lambda x,y: x + y).collect()
# groupByKey():按照key分组
pairRDD.groupByKey().collect()
# keys:返回所有的key
pairRDD.keys().collect()
# values: 返回所有的value
pairRDD.values().collect()

# sortByKey():按照key进行排序,默认是升序
pairRDD.sortByKey(ascending=False).collect()

# mapValues(func):将函数应用到pair rdd中的每个元素上,不改变key
pairRDD.mapValues(lambda x: x*x).collect()
# flatMapValues(func)
pairRDD.flatMapValues(lambda x: range(x,6)).collect()

复杂算子 combineByKey()


# combineByKey():
data = spark.sparkContext.parallelize([("company-1",92),("company-1",85),("company-1",82),
                   ("company-1",93),("company-1",86),("company-1",83),
                   ("company-2",78),("company-2",96),("company-2",85),
                   ("company-3",88),("company-3",94),("company-3",80)],3)
cbk = data.combineByKey(
            lambda income: (income,1),
            lambda t,income: (t[0]+income, t[1]+1),
            lambda t1,t2: (t1[0]+t2[0], t1[1]+t2[1])    # 将不同分区的同一个key的C合并
        )
cbk.collect()     # 每个公司的总收入

# 每个公司的平均收入
cbk.map(lambda t: (t[0],t[1][0],t[1][0]/float(t[1][1]))).collect()
# groupByKey
x = spark.sparkContext.parallelize([
    ("USA", 1), ("USA", 2), ("India", 1),
    ("UK", 1), ("India", 4), ("India", 9),
    ("USA", 8), ("USA", 3), ("India", 4),
    ("UK", 6), ("UK", 9), ("UK", 5)], 4)
 
# 使用groupByKey,默认分区
y = x.groupByKey()
 
# 查看分区
print('分区数: ',y.getNumPartitions()) 
 
# 使用预定义的分区
y = x.groupByKey(2)
print('分区数: ',y.getNumPartitions())
 
# 输出结果
for t in y.collect():
    print(t[0], [v for v in t[1]])

复杂算子 aggregateByKey

# 使用key-value对创建PairRDD studentRDD
student_rdd = spark.sparkContext.parallelize([
          ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), 
          ("Joseph", "Biology", 82),   ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), 
          ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), 
          ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
          ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), 
          ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), 
          ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), 
          ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
          ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), 
          ("Juan", "Biology", 60)], 2)
    
# 定义Seqencial Operation and Combiner Operations
# Sequence operation : 从单个分区查找最大成绩
def seq_op(accumulator, element):
    if(accumulator > element[1]):
        return accumulator 
    else: 
        return element[1]
 
 
# Combiner Operation : 从所有分区累加器中找出最大成绩
def comb_op(accumulator1, accumulator2):
    if(accumulator1 > accumulator2):
        return accumulator1 
    else:
        return accumulator2
 

# 在我们的情况下,零值将是零,因为我们正在寻找最大的成绩
zero_val = 0
aggr_rdd = student_rdd.map(lambda t: (t[0], (t[1], t[2]))).aggregateByKey(zero_val, seq_op, comb_op) 
 
# 查看输出
for tpl in aggr_rdd.collect():
    print(tpl)

关闭 spark api,停止saprk job

spark.stop()

RDD下的简单统计方法

rdd1.sum()

rdd1.max()

rdd1.min()

# 均值
rdd1.mean()

rdd1.count()

# 方差
rdd1.variance()

# 样本方差
rdd1.sampleVariance()

# 标准差
rdd1.stdev()

# 抽样标准差
rdd1.sampleStdev()

# 直方图
# 方法1
# rdd1.histogram([1.0, 10.0, 20.9])
rdd1.histogram([1.0, 8.0, 20.9])

# 方法2
rdd1.histogram(3)
# 通过调用stats()方法返回一个StatsCounter对象
status = rdd1.stats()

print(status.count())
print(status.mean())
print(status.stdev())
print(status.max())
print(status.min())
print(status.sum())
print(status.variance())
print(status)
spark.stop()

持久化(缓存)与共享变量

发布了39 篇原创文章 · 获赞 42 · 访问量 4850

猜你喜欢

转载自blog.csdn.net/weixin_41774099/article/details/103162441