Spark,总结和帮助记忆常用action算子和Transformation算子

大家好!

环境:三台虚拟机服务器

运行:在window,pycharm远程连接node1,node2,node3.

分布式处理数据加快处理速度!

内容有点干而且长,建议大伙收藏慢慢品尝!

最近在学习spark,我总结一下我在spark学习中,常用的action算子和Trans'formation算子

还有就是把有相似功能的算子写在一起方便我们记忆。

话不多说哈!我直接上内容,并且把一些示例代码提供给大家,方便大家理解和学习,也帮助我加强记忆!

这些代码不能直接run的哦!需要配置好环境才行!

一、

1.map算子:对RDD数据进行处理,返回新的RDD

"""
演示RDD的map方法使用;
map方法就是对rdd对象的运算,由map决定如何运算
"""
import findspark

findspark.init()
from pyspark import SparkContext, SparkConf

# 创建sc对象必须的!
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD对象
rdd = sc.parallelize([1, 2, 3, 4, 5])


# 通过map方法将全部数据加载且乘以10
def func(data):
    return data * 10


# 使用map方法改变了原来的RDD对象就要用一个新的rdd对象去接收
# 根据老师说的原理是,一个一个的把列表的元素传进去函数,然后一个一个的乘以10,最后把这个列表输出
# 有两种方法实现map方法的使用,一种是用函数,传入函数名字,另一种是用lambda函数
# rdd2 = rdd.map(func)
# 因为用map方法对RDD对象进行运算返回的也是rdd对象,所以要连续改变的可以继续使用其他方法,例如下面:
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 10)
# f: (T) -> U, 什么类型都行,返回数据就行
# f: (T) -> T, 什么类型就返回什么数据类型
print(rdd2.collect())

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

flatMap算子:对RDD数据进行处理,然后进行解除嵌套操作

"""
演示RDD的flatMap方法使用
flatMap方法承载了map方法的使用外,还增加解开嵌套数据的功能
"""
import findspark

findspark.init()
from pyspark import SparkContext, SparkConf

# 创建sc对象必须的!
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize(["itcast good 666","itcast python 123","python itcast good"])
# 使用map方法和flatmap方法运算嵌套数据对比
rdd1 = rdd.map(lambda x: x.split(" "))
print(rdd1.collect())
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())
# 输出结果
# [['itcast', 'good', '666'], ['itcast', 'python', '123'], ['python', 'itcast', 'good']]
# ['itcast', 'good', '666', 'itcast', 'python', '123', 'python', 'itcast', 'good']
# 查看输出的rdd对象数据类型
print(type(rdd1.collect()))
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

mapValues算子:也是处理型算子,不过是对二元元组数据的Value进行处理罢了

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 8, 645, 789, 8792, 654, 789, 65, 23, 65, 7, 8, 9], 1)

# mapPartitions传入的是list,所以返回的要是list
# 传入方式可以ctrl+Q进行查看


def fuck(a):
    result = []
    for i in a:
        result.append(i * 10)
    return result


rdd1 = rdd.mapPartitions()
print(rdd1.collect())

foreach算子:对RDD的数据进行处理,但是没有返回值,原因就是内部EXecutor处理完直接输出,没有发送到driver收集然后再发出,所以collect函数返回的值是None

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 8, 645, 789, 8792, 654, 789, 65, 23, 65, 7, 8, 9])

# 使用方法相当于map算子,但是呢,没有输出结果
# 因为这里执行的时候,executor处理完之后直接输出了
# 所以driver收集不到数据,所以输出的时候没有结果
result = rdd.foreach(lambda x: x * 10)
print(result)
result = rdd.foreach(lambda x: print(x * 10))

mapPartiutions算子和map算子功能一样,不过运算速度更快

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 8, 645, 789, 8792, 654, 789, 65, 23, 65, 7, 8, 9], 1)

# mapPartitions传入的是list,所以返回的要是list
# 传入方式可以ctrl+Q进行查看


def fuck(a):
    result = []
    for i in a:
        result.append(i * 10)
    return result


rdd1 = rdd.mapPartitions()
print(rdd1.collect())

foreachPartitions和foreach算子功能一样,不过运算速度更快

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 8, 645, 789, 8792, 654, 789, 65, 23, 65, 7, 8, 9], 3)

# foreachPartition是action算子


def process(iter):
    result = []
    for i in iter:
        result.append(i * 10)

    print(result)


print(rdd.foreachPartition(process))

二、

reduceByKey算子:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合

"""
演示RDD的reduceByKey方法使用
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作
KV型,就是元组型
# 用法
rdd.reduceByKey(func)
# func : (v,v) -> v
# 接受2哥传入参数(类型要一致),返回一个返回值,类型和传入要求要一致
"""
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf


# 创建sc对象必须的!
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([("男", 85), ("男", 10), ("女", 20), ("女", 20),("男", 5)])
# 运算机制就是找到同样的key,然后key值相加;接受一个处理函数,对数据就行两两相加,就好像85+10=95; 最后再95+5=100
# 求男女两组的分别对应成绩之和
rdd1 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd1.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

groupBy算子:将RDD的数据进行分组

# coding:utf8
# 导入包
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1), ('b', 1), ('a', 1)])

# 通过groupBy对数据进行分组
# groupBy传入的函数的意思是:通过这个函数,确定按照谁来分组(返回谁即可)
result = rdd.groupBy(lambda x: x[0])
print(result.map(lambda t: t[0]).collect())
print(result.map(lambda t: (t[0], list(t[1]))).collect())

groupByKey算子:针对KV型RDD,自动按照Key分组

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1), ('b', 1), ('a', 1)])

print(rdd.groupByKey().map(lambda x: (x[0], list(x[1]))).collect())
# 对比一下groupBy的结果
result = rdd.groupBy(lambda x: x[0])
print(result.map(lambda t: (t[0], list(t[1]))).collect())

三、

Filter算子:保留想要的数据

"""
演示filter方法的使用
功能:过滤想要保留的数据
"""
# 1.构建执行环境入口对象
import findspark

findspark.init()
from pyspark import SparkContext, SparkConf

# 创建sc对象必须的!
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个rdd对象
rdd = sc.parallelize([1, 2, 3, 4, 5, 5, 5, 4])
# 记得要添加过滤的条件
rdd2 = rdd.filter(lambda sum: sum % 2 == 0)
print(rdd2.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

四.

distinct算子:对rdd数据进行去重,返回新的RDD

"""
演示distinct方法的使用
功能:对RDD数据进行去重,返回新的RDD
"""
# 1.构建执行环境入口对象
import findspark

findspark.init()
from pyspark import SparkContext, SparkConf

# 创建sc对象必须的!
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个rdd对象
rdd = sc.parallelize([1, 2, 3, 4, 5, 5, 5, 4])
# 记得要添加过滤的条件
rdd2 = rdd.filter(lambda sum: sum % 2 == 0)
print(rdd2.collect())
# 运用distinct方法进行去重
# 不需要传参
rdd3 = rdd2.distinct()
print(rdd3.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

五.

union算子:2个rdd合并成一个rdd返回

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 创建两个RDD
rdd1 = sc.parallelize([1, 3, 4, 5])
rdd2 = sc.parallelize(['a', 'b'])

union_rdd1 = rdd1.union(rdd2)
union_rdd2 = rdd2.union(rdd1)
print(union_rdd1.collect())
print(union_rdd2.collect())
"""类型不同也可以合并在一起
union算子是不会去重"""

六.

join算子:对两个RDD执行JOIN操作(实现SQL的内外连接)

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 创建两个RDD
rdd1 = sc.parallelize([(1001, "zhangshan"), (1002, "lishan"), (1003, "chaoren"),
                       (1004, "sihai")])
rdd2 = sc.parallelize([(1001, "市场部"), (1002, "管理部")])
# 通过join算子来进行rdd之间的关联
# 对于join算子来说 关联条件 按照二元组的key来进行关联
print(rdd1.join(rdd2).collect())

# 左外连接,右外连接,可以更换一下rdd的顺序,或者调用rightOuterJoin即可
print(rdd1.leftOuterJoin(rdd2).collect())
print(rdd1.rightOuterJoin(rdd2).collect())

七.

intersection算子:求两个rdd的交集,返回一个新的rdd

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 创建两个RDD
rdd1 = sc.parallelize([(1001, "zhangshan"), (1002, "lishan"), (1003, "chaoren"),
                       (1004, "sihai")])
rdd2 = sc.parallelize([(1001, "zhangshan"), (1002, "管理部")])

# 通过intersection算子求RDD之间的交集,将交集取出丢进新的rdd

rdd3 = rdd1.intersection(rdd2)
print(rdd3.collect())

八.

glom算子:将rdd的数据,加上嵌套,这个嵌套按照分数数目来进行分配

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3)
print(rdd1.glom().collect())
# glom——rdd不能直接解嵌套
print(rdd1.glom().flatMap(lambda x: x).collect())

九.

sortBy算子:对RDD数据进行排序,基于你指定的排序依据

"""
演示sortby方法的使用
功能:对RDD数据进行排序,基于你指定的排序依据
语法:
rdd.sortBy(func,ascending=False,numPartitions=1)
func: (T) -> U: 告知按照rdd中哪个数据进行排序,比如lambda x: x[1] 表示按照rdd的第二个元素进行排序依据
ascending Ture 升序,False 降序
numPartitions: 用多少分区排序

"""
# 1.构建执行环境入口对象
import findspark

findspark.init()
from pyspark import SparkContext, SparkConf

# 创建sc对象必须的!
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 1.读取文件
rdd = sc.textFile("./hello.txt")
# 2.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 3.转换为二元元组,单词为key,value为1
change_rdd = word_rdd.map(lambda word: (word, 1))
# 4.分组求和
result_rdd = change_rdd.reduceByKey(lambda a,b: a + b)
print(result_rdd.collect())
# 5.对结构排序
finally_rdd = result_rdd.sortBy(lambda x:x[1], ascending=True, numPartitions=1)
print(finally_rdd.collect())
# [('itcast', 1), ('python', 3), ('holy', 12), ('hello', 6)]
# [('itcast', 1), ('python', 3), ('hello', 6), ('holy', 12)]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

sortByKey算子:针对KV型RDD,按照key进行排序

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('b', 1), ('c', 1), ('P', 1), ('g', 1)])

# keyfunc=lambda key:str(key).lower()将key值强制转换为小写值
print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key:str(key)
                    .lower()).collect())

十.

countByKey算子:统计key出现的次数(一般使用在KV型RDD)

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 读取数据
rdd = sc.textFile("./data/hello.txt")
# 化成元组形式
rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))
# 计算
rdd3 = rdd2.countByKey()
# 输出
print(rdd3)

count算子:计算RDD有多少条数据,返回一个数字

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 3.第三个是count算子
# 算出该rdd有几个元素
result2 = rdd.count()
print(result2)

十一.

collect算子:对RDD各个分区内的数据,统一收集到driver,形成一个list对象

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3)
print(rdd1.glom().collect())
# glom——rdd不能直接解嵌套
print(rdd1.glom().flatMap(lambda x: x).collect())

十二.

reduce算子:对rdd数据集按照你传入的逻辑进行聚合

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 56, ])
print(rdd.reduce(lambda a, b: a + b))

fold算子:和reduce一样,接收传入的逻辑进行聚合,聚合带有初始值的,这个初始值会作用在:分区内和分区间聚合

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 56], 1)
"""fold的具体运算是这样的:因为分区这里设置一个那么,先100+1,然后再101+2,每一个元素加一次,最后加完之后
应该是166才对,但是后面它还加一次我们的初始值,所以266=100+166得到的"""
print(rdd.fold(100, lambda a, b: a + b))

reduceByKey:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合

"""
演示RDD的reduceByKey方法使用
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作
KV型,就是元组型
# 用法
rdd.reduceByKey(func)
# func : (v,v) -> v
# 接受2哥传入参数(类型要一致),返回一个返回值,类型和传入要求要一致
"""
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf


# 创建sc对象必须的!
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([("男", 85), ("男", 10), ("女", 20), ("女", 20),("男", 5)])
# 运算机制就是找到同样的key,然后key值相加;接受一个处理函数,对数据就行两两相加,就好像85+10=95; 最后再95+5=100
# 求男女两组的分别对应成绩之和
rdd1 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd1.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

十三.

first算子:取出rdd的第一个元素

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 1.首先是first算子
# 取第一个元素输出
rdd = sc.parallelize([1, 2, 3, 4, 56])
result = rdd.first()
print(result)

take算子:取RDD的前N个元素,组成list返回给你

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 2.第二个是take算子
# 根据take的参数决定确定取从头开始,取几个元素
result1 = rdd.take(4)
print(result1)

top算子:对RDD数据集进行降序排序,取前N个

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 4.第四给是top算子
# 对元素进行降序排序,然后由参数决定输出前几个元素
result3 = rdd.top(2)
print(result3)

takeOrdered算子:对RDD进行排序前N个

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 8, 645, 789, 8792, 654, 789, 65, 23, 65, 7, 8, 9])

# 参数是升序后,取的数量的多少
result = rdd.takeOrdered(3)
print(result)
# lambda x: -x,这里是将所有数假装把所有数取负,进行排序后按照原来的数进行排序
result1 = rdd.takeOrdered(5, lambda x: -x)
print(result1)

十四.

takeSample算子:随机抽样RDD的数据

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 8, 645, 789, 8792, 654, 789, 65, 23, 65, 7, 8, 9])

# 参数一:Ture表示可以取相同位置的数,False表示不可以
# 参数二:取数数目的多少
# 参数三:是种子,如果种子固定,那么产生的随机数也是固定的,默认种子也是不固定的
# 如果取的数数目超过实际的话,如果参数一是Ture的话可以满足,如果是False的话那么只能是所有元素的数量
result = rdd.takeSample(False, 10, 1)
print(result)

十五.

saveAsTextFile算子:将RDD的数据写入文本文件中,支持本地写出,HDFS等文件系统

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 分区为1的时候
# rdd = sc.parallelize([1, 8, 645, 789, 8792, 654, 789, 65, 23, 65, 7, 8, 9],1)
# rdd.saveAsTextFile("./data/out1")

# 分区为3的时候
rdd = sc.parallelize([1, 8, 645, 789, 8792, 654, 789, 65, 23, 65, 7, 8, 9], 3)
rdd.saveAsTextFile("./data/out2")
"""因为解释器在Linux,所以在window上面要同步数据;
如何同步?先右键data文件夹,选中Deployment,点击Syac...,然后在顶部点击Syac... all"""

textFiles算子:读取外部文件,创建RDD对象

# coding:utf8
# 导入包
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 通过textFile API 读取数据
# 读取本地文件数据
file_rdd = sc.textFile("./data/hello.txt")
print("默认读取分区数", file_rdd.getNumPartitions())
print("file_rdd的内容:", file_rdd.collect())

# 加最小分区数参数的测试
file_rdd1 = sc.textFile("./data/hello.txt", 3)
file_rdd2 = sc.textFile("./data/hello.txt", 1000)
print("读取分区数", file_rdd1.getNumPartitions())
print("读取分区数", file_rdd2.getNumPartitions())


# 读取HDFS文件数据测试

file_rdd3 = sc.textFile("hdfs://node1:8020/hello.txt")

wholetextFile算子:读取外部文件,或者文件夹,创建RDD对象

# coding:utf8
# 导入包
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

# 读取小文件夹
rdd = sc.wholeTextFiles("./data/tiny_files")
print("内容:", rdd.collect())
print(rdd.map(lambda x: x[1]).collect())

十六.

partitionBy算子:对RDD进行自定义分区操作

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('haddop', 1), ('hello', 1), ("hi", 1), ('fuck', 1)])


# 使用partitionBy 自定义分区


def process(l):
    if l == 'hadoop' or l == 'hello': return 0
    if l == 'hi': return 1
    return 2


# 3是代表分区的重新分区的数量
# process是自定义规则的函数的名字
print(rdd.partitionBy(3, process).glom().collect())

repartitions算子:对RDD的分区执行重新分区(仅仅数量)

# coding:utf8
# 导入包
import os

os.environ['JAVA_HOME'] = "/export/server/jdk1.8.0_241"
from pyspark import SparkContext, SparkConf

# 构建SparkText对象
conf = SparkConf().setMaster("local[*]").setAppName("test1")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 8, 645, 789, 8792, 654, 789, 65, 23, 65, 7, 8, 9], 3)

# 通过repartition修改分区--增加分区
# repartition是在coalesce的基础改进的,更加推荐
print(rdd.repartition(1).getNumPartitions())
print(rdd.repartition(5).getNumPartitions())

# coalesce修改分区--增加分区
print(rdd.coalesce(1).getNumPartitions())
# print(rdd.coalesce(5).getNumPartitions())
print(rdd.coalesce(5, shuffle=True).getNumPartitions())

ok!基本上就先分享这么多了!

好了,今天的分享就这么多了,有什么不清楚或者我写错的地方,请多多指教!

私信,评论我呗!!!!!!

分享一张萌图!

猜你喜欢

转载自blog.csdn.net/hhR888888/article/details/129089449