RDD的算子分为两类:
- Transformation转换操作:返回一个新的RDD
- Action动作操作:返回值不是RDD(无返回值或返回其他的)
1、Transformation 转换 算子(lazy)
- RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算。
- Spark仅记录作用于RDD上的转换操作逻辑,当遇到动作算子( Action)时才会进行真正计算。
- RDD整体上分为Value类型和Key-Value类型。
RDD常见转换算子如下表:
Transformation算子 | Meaning(含义) |
---|---|
map(func) | 通过函数func作用于 源 RDD中 的每个元素,返回一个新的 RDD |
filter(func) | 过滤,选择函数func为真的元素,返回一个新的RDD |
flatMap(func) | 压平打散,返回一个新的RDD,类型为Seq |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是 zeroValue. seqOp的操作是遍历分区中的所有元素 (T),第一个 T跟 zeroValue做操作,结果再作为与第二个 T做操作的 zeroValue,直到遍历完整个分区。 combOp操作是把各分区聚合的结果,再聚合 |
sortByKey([ascending], [numTasks]) | 根据key进行排序,默认为升序 ascending: Boolean = true |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
cartesian(otherDataset) | 笛卡尔积,在类型为T和 U的数据集上调用时,返回一个 (T, U)对 (所有对元素 )的数据集 |
pipe(command, [envVars]) | 通过shell命令 (例如 Perl或 bash脚本 )对 RDD的每个分区进行管道传输。将 RDD元素写入进程的stdin,并将其输出到 stdout的行作为字符串 RDD返回 |
coalesce(numPartitions) | 将RDD中的分区数量减少到 numpartition |
repartition(numPartitions) | 随机地重新Shuffle RDD中的数据,以创建更多或更少的分区,并在它们之间进行平衡 |
2、 Actions(non-lazy)动作算子
Actions | Meaning |
---|---|
reduce(func) | 使用函数func(它接受两个参数并返回一个 )聚合数据集的元素 |
collect() | 在驱动程序Driver 中以数组的形式返回数据集的所有元素 |
count() | 返回数据集中元素的数量 |
first() | 返回数据集的第一个元素(类似于 take(1)) |
take(n) | 返回一个包含数据集前n个元素的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,其中包含数据集的随机num元素样本,可以替换,也可以不替换,可以预先指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 使用RDD的自然顺序或自定义比较器返回 RDD的前n个元素 |
saveAsTextFile(path) | 将数据集的元素作为文本文件(或文本文件集 )写入本地文件系统、 HDFS或任何其他 hadoop支持的文件系统的给定目录中。 Spark将对每个元素调用 toString,将其转换为文件中的一行文本 |
saveAsSequenceFile(path) | 将数据集的元素作为Hadoop SequenceFile写入本地文件系统、 HDFS或任何其他 Hadoop支持的文件系统的给定路径中。这在实现 Hadoop的可写接口的键值对的 RDDs上是可用的。在 Scala中,它也可用于隐式转换为可写的类型 (Spark包括对Int、 Double、 String等基本类型的转换 ) |
saveAsObjectFile(path) | 使用Java序列化以简单的格式编写数据集的元素,然后可以使用 SparkContext.objectFile()加载这些元素 |
countByKey() | 仅在类型(K, V)的 RDDs上可用。返回 (K, Int)对的Map表示 每个键的计数 |
foreach(func) | 对数据集的每个元素运行函数func |
foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
fold | def fold(zeroValue: T)(op: (T, T) ⇒ T): T fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op |
lookup | def lookup(key: K): Seq[V]lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值 |
aggregtate | def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作 |
3、案例
3.1 Transformation
见优秀博客
3.2 Action
见优秀博客
参考资料:
https://blog.csdn.net/and52696686/article/details/107822714