RDD常用操作
-
transformation[转换(从已经存在的中创建新数据集)]
RDDA-------transformation------>RDDB
例:rddb=rdda.map(....)
map/filter/group by/distinct/...
转换惰性的,并不会计算结果,仅仅只会记住数据集作用上一个转换
当转换遇到action,才会干活
-
action[动作(返回一个对数据集操作之后的值)]
动作触发计算
count/reduce/collect/.....
rdda.map(....).....collect()
(不管遇到了多少转换(例:map)都不做,只有等到动作(例:collect)的时候才会干活)
RDD常用Transformation算子编程
map(func)映射,将func函数作用到数据集的每一个元素上,生成一个新的分布式的数据集返回
filter(func)过滤,选出所有func返回值为true的元素,生成一个新的分布式的数据集返回
flatMap(func)输入的item能够被map到0或者多个items输出,返回值是一个Sequence(序列)
groupByKey()把相同的key的数据分发到一起
['hello', 'spark', 'hello', 'world', 'hello', 'world']
[{'world': [1, 1]}, {'hello': [1, 1, 1]}, {'spark': [1]}]
reduceByKey(func):把相同的key的数据分发到一起并进行相应的计算
mapRdd.reduceByKey(lambda a,b:a+b)
[1,1] 1+1=2
[1] 1=1
[1,1,1,1] 1+1+1+1=4
sortBy(func) 排序 按第几个元素排序
reduceByKeyRdd.sortBy(lambda x: x[1])
union(rdd)合并
[1,2,3]+[3,4,5]=[1,2,3,3,4,5]
distinct()去重
[1,2,3,3,4,5].distinct()=[1,2,3,4,5]
join(rdd):
inner join
outer /leftOuterJoin/rightOuterJoin/fullOuterJoin
RDD常用action算子编程
collect,count,take,reduce,saveAsTextFile,foreach
在服务器上运行测试
bin/spark-submit --master local[2] --name spark0402 /opt/datas/helloWord.py file:///opt/datas/spark_readme.txt file:///opt/datas/wc
在pycharm中测试时可以用下面的方式添加参数
今日完整代码
import os
import sys
from pyspark import SparkConf,SparkContext
# #创建sparkconf:设置的是spark相关的参数信息
# conf=SparkConf().setMaster("local[2]").setAppName("spark0301")
# #创建sparkcontext
# sc=SparkContext(conf=conf)
#
# #业务逻辑
# data=[1,3,44,5]
# distData=sc.parallelize(data)
# print(distData.collect())
#
# sc.stop()
if __name__=="__main__":
conf=SparkConf().setMaster("local[2]").setAppName("spark0301")
sc=SparkContext(conf=conf)
def my_map():
"""[2, 4, 6, 8, 10]"""
data=[1,2,3,4,5]
rdd1=sc.parallelize(data)
rdd2=rdd1.map(lambda x:x*2)
return rdd2.collect()
def my_map2():
"""
[('dog', 1), ('tiger', 1), ('lion', 1), ('cat', 1), ('panther', 1), ('eagle', 1)]
:return:
"""
a=sc.parallelize(['dog',"tiger","lion","cat","panther","eagle"])
b=a.map(lambda x:(x,1))
return b.collect()
def my_filter():
"""[6, 8, 10]"""
data=[1,2,3,4,5]
rdd1=sc.parallelize(data)
mapRdd=rdd1.map(lambda x:x*2)
filterRdd=mapRdd.filter(lambda x:x>5)
return filterRdd.collect()
def my_flatMap():
"""
['hello', 'spark', 'hello', 'world', 'hello', 'world']
:return:
"""
data=['hello spark',"hello world","hello world"]
rdd=sc.parallelize(data)
return rdd.flatMap(lambda line:line.split(" ")).collect()
def my_groupBy():
"""
[('world', <pyspark.resultiterable.ResultIterable object at 0x7fbd2d73b208>), ('hello', <pyspark.resultiterable.ResultIterable object at 0x7fbd2d73b908>), ('spark', <pyspark.resultiterable.ResultIterable object at 0x7fbd2d73b978>)]
[{'world': 2}, {'hello': 3}, {'spark': 1}]
[{'world': [1, 1]}, {'hello': [1, 1, 1]}, {'spark': [1]}]
:return:
"""
data = ['hello spark', "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd=rdd.flatMap(lambda line: line.split(" ")).map(lambda x:(x,1))
groupByRdd=mapRdd.groupByKey()
print(groupByRdd.collect())
print(groupByRdd.map(lambda x:{x[0]:sum(x[1])}).collect())
return groupByRdd.map(lambda x:{x[0]:list(x[1])}).collect()
def my_reduceByKey():
"""[('world', 2), ('hello', 3), ('spark', 1)]"""
data = ['hello spark', "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
reduceByKeyRdd=mapRdd.reduceByKey(lambda a,b:a+b)
return reduceByKeyRdd.collect()
"""
需求:请按wc结果中出现的次数降序排列
"""
def my_sort():
data = ['hello spark', "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)
return reduceByKeyRdd.sortBy(lambda x: x[1]).collect()
def union():
"""[1, 2, 3, 3, 4, 5]"""
a=sc.parallelize([1,2,3])
b=sc.parallelize([3,4,5])
return a.union(b).collect()
def my_distinct():
a = sc.parallelize([1, 2, 3])
b = sc.parallelize([3, 4, 2])
return a.union(b).distinct().collect()
def my_join():
"""
[('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3'))]
[('A', ('a1', 'a2')), ('B', ('b1', None)), ('F', ('f1', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None))]
[('A', ('a1', 'a2')), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('E', (None, 'e1'))]
[('A', ('a1', 'a2')), ('B', ('b1', None)), ('F', ('f1', None)), ('C', ('c1', 'c2')), ('C', ('c1', 'c3')), ('D', ('d1', None)), ('E', (None, 'e1'))]
:return:
"""
a = sc.parallelize([("A","a1"),("B","b1"),("C","c1"),("D","d1"),("F","f1")])
b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
print(a.join(b).collect())
print(a.leftOuterJoin(b).collect())
print(a.rightOuterJoin(b).collect())
print(a.fullOuterJoin(b).collect())
return True
def my_action():
"""
1
6
7
8
9
10
2
3
4
5
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 10 [1, 2, 3] 1 10 55
:return:
"""
data=[1,2,3,4,5,6,7,8,9,10]
rdd=sc.parallelize(data)
rdd.foreach(lambda x: print(x))
print(
rdd.collect(),
rdd.count(),
rdd.take(3),
rdd.min(),
rdd.max(),
rdd.reduce(lambda x,y:x+y),
)
def word_count():
"""词频统计
文本内容的每一行转成一个个的单词 flatMap
单词==》(单词,1) map
把所有相同单词的技术相加得到最终的结果 reduceByKey
"""
if len(sys.argv)!=2:
print("Usage: wordcount <input>,file=sys.stderr")
sys.exit(-1)
counts=sc.textFile(sys.argv[1]).\
flatMap(lambda line:line.split(" "))\
.map(lambda x:(x,1))\
.reduceByKey(lambda a,b:a+b)
output=counts.collect()
for (word,count)in output:
print(f"{word},{count}")
def word_count_save_file():
"""词频统计
文本内容的每一行转成一个个的单词 flatMap
单词==》(单词,1) map
把所有相同单词的技术相加得到最终的结果 reduceByKey
并保存
"""
if len(sys.argv)!=3:
print("Usage: wordcount <input>,file=sys.stderr")
sys.exit(-1)
sc.textFile(sys.argv[1]).\
flatMap(lambda line:line.split(" "))\
.map(lambda x:(x,1))\
.reduceByKey(lambda a,b:a+b)\
.saveAsTextFile(sys.argv[2])
def topN():
"""
制定类型的数据计数,本例取前五个
比如
time,id,url,from,....,用id来计数排序
:return:
"""
if len(sys.argv)!=2:
print("Usage: wordcount <input>,file=sys.stderr")
sys.exit(-1)
return sc.textFile(sys.argv[1]) \
.map(lambda line:line.split(" "))\
.map(lambda x:(x[1],1))\
.reduceByKey(lambda a,b:a+b)\
.sortBy(lambda x: x[1],False)\
.take(5)
def avg_age():
"""平均值"""
ageData=sc.parallelize([17, 57, 71, 16, 50, 76, 2, 75, 22, 61, 31, 41, 33, 53, 59, 9, 52, 9, 68, 45, 9, 55, 47, 60, 15, 71, 57, 32, 37, 3, 77, 29, 59, 51, 39, 63, 31, 53, 19, 31, 35, 38, 36, 67, 27, 59, 67, 66, 22, 38, 16, 72, 73, 32, 76, 2, 74, 37, 32, 2, 22, 3, 65, 3, 35, 48, 53, 35, 10, 72, 39, 5, 80, 1, 74, 6, 2, 11, 80, 80, 14, 51, 18, 26, 70, 44, 47, 49, 80, 11, 78, 69, 72, 65, 78, 49, 12, 74, 64, 41])
totalAge=ageData.reduce(lambda a,b:a+b)
count=ageData.count()
avgAge=totalAge/count
return totalAge,count,avgAge
try:
print(avg_age())
except Exception as e:
print(e)
finally:
sc.stop()