RDD:RDD分区数,若从HDFS创建RDD,RDD的分区就是和文件块一一对应,若是集合并行化形式创建,RDD分区数可以指定,一般默认值是CPU的核数。
task:task数量就是和分区数量对应。
1、transformation:
(1)map(func):将函数应用于RDD中的每一个元素,将返回值构成新的RDD。
rdd.map(x=>x+1)
如:{1,2,3,3} 结果为 {2,3,4,4}
(2)mapPartitions(func):函数中传入的参数是迭代器,迭代器里面保存的是一个分区里面的数据。
/** * makeRDD方法的第一个参数代表的是RDD中的 元素 * 第二个参数:RDD的分区数 * rdd[Int] */ val rdd = sc.makeRDD(1 to 10,3) /** * mapPartitions这个算子遍历的单位是partition * 会将一个partition的数据量全部加载到一个集合里面 */ val mapPartitonsRDD = rdd.mapPartitions(iterator=>{ val list = new ListBuffer[Int]() //创建一个数据库连接 while(iterator.hasNext){ val num = iterator.next() list.+=(num+100) } //批量插入数据库 list.iterator }, false) /** * 想要执行,必须有action类的算子 * collect算子会将集群中计算的结果回收到Driver端,慎用 */ val resultArr = mapPartitonsRDD.collect() resultArr.foreach { println }
map和mapPartition的异同:
mapPartition function一次处理一个分区的数据,性能比较高;
map的function一次只处理一条数据。
如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
SparkSql或DataFrame默认会对程序进行mapPartition的优化。
参考博客:https://blog.csdn.net/wuxintdrh/article/details/80278479
(3)reduceByKey(func,[numTask]):找到相同的key,对其进行聚合,聚合的规则由func指定。
reduce任务的数量可以由numTask指定
goodsSaleRDD.reduceByKey((x,y) => x+y)
参考博客:https://www.jianshu.com/p/af175e66ce99
(4)
2、action: