【spark】存储数据到hdfs,自动判断合理分块数量(repartition和coalesce)(一)

本人菜鸟一只,也处于学习阶段,如果有什么说错的地方还请大家批评指出!

首先我想说明下该文章是干嘛的,该文章粗略介绍了hdfs存储数据文件块策略和spark的repartition、coalesce两个算子的区别,是为了下一篇文章的自动判断合理分块数做知识的铺垫,如果对于这部分知识已经了解,甚至精通的同学,可以直接跳到该系列的第二篇文章!

背景:

spark读取Hive表或者HDFS甚至各种框架的数据,生成了Rdd或者Dataset(或者DataFrame,但是在spark2以后的版本DataFrame这个对象已经被取消了,全部用Dataset替代),经过一大堆逻辑处理之后,将数据又存回了HDFS。但是这时候就会遇到一个问题,就是这份数据在HDFS上会被分成几份,个数是否合理?

原因:

为什么要考虑这个问题?是因为HDFS不适合存太多小文件,因为每个文件块都会有一段地址存在namenode的内存中,如果太多小文件或者目录太深,会大大降低HDFS存储的性能,而且在某些极端的情况下,spark甚至会存储很多空文件,这些空文件还会影响之后的计算(因为需要启动一个map来读取这个空文件,消耗了资源和时间,却在做无用功)

例如:

如图,我们想要的是,每个文件块的大小都在几十M甚至200M,300M之间(不需要十分的精确)。

但是每个文件也不能太大,为什么?

压缩格式     工具     算法     文件扩展名     是否可切分
DEFLATE     无     DEFLATE     .deflate    
Gzip     gzip     DEFLATE     .gz    
bzip2     bzip2     bzip2     .bz2    
LZO     lzop     LZO     .lzo    
LZ4     无     LZ4     .lz4    
Snappy     无     Snappy     .snappy    

该表格来自于:(作者:瓜牛呱呱)https://blog.csdn.net/lin_wj1995/article/details/78967486

解释下:hadoop的MR(MapReduce)和spark的引擎在读取数据的时候,一个block可能会使用多个线程一起读。

例如:

线程1读取偏移量:0+10000的数据

线程2读取偏移量:10001+20000的数据

.....

线程7读取偏移量:60001+62043的数据

如图(原谅我数据不够大,只有一个线程在读,spark默认是128M启动一个线程读,也就是说如果文件有300M,那么spark会启动3个核心来同时读取这个数据):

以此类推,来加快数据读取的速度,但是问题来了,如果HDFS上的数据做了压缩,那就只有bzip2的压缩格式才可以多个线程加速读的,所以如果一个文件块非常大的话,只能用一个线程来读这个非常大的文件,读取数据的时间就会非常的长,延长任务运行时间。因此,相对合理的方式就是每个文件块大小在128M之间,这样不管是否压缩,读取数据的时候都不会受到太大的影响。

前言:

来看2个API(repartition和coalesce)和两个问题

1、repartition和coalesce的区别

-1.先说shuffle:

spark有个阶段叫shuffle,他的shuffle和MR的shuffle不一样,但是都会经历重分区的过程,如何判断什么时候有shuffle阶段呢?看看代码中是否有这些过程就好了,例如:去重,join,group by ,各种聚合函数(reduceByKey等),排序。没有shuffle的spark代码,就相当于只对数据做简单的过滤,对应到MR上,就是只有map逻辑,没有reduce逻辑,所以代码中shuffle越多,消耗的资源也越多,速度也会越慢,因为spark的shuffle过程和MR的reduce之前的shuffle过程一样,数据需要在不同的节点之间传递。

-2.repartition和coalesce的区别:

repartition会触发shuffle,coalesce不会,所以repartition性能比coalesce差!

看spark源码:

  def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = true, logicalPlan)
  }


  def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = false, logicalPlan)
  }

-3.原理:repartition将所有数据重新分区,coalesce是单纯将不同分区的数据直接合并到一个分区里。直观来看,repartition之后,每个文件块大小会基本一样,coalesce之后,每个文件块大小一般都是不一样的,甚至会差很多。

-4.总结:repartition一般是用来增加分区数(当然也可以减少),coalesce只能用来减少分区数。所以如果不介意保存的文件块大小不一样,可以使用coalesce来减少分区数,保存的时候一个分区就会生成一个文件块。

 

2、spark文件块什么时候增加的,增加有什么用?

接下来的文字描述,是针对于sparksql(也就是把数据加载成Dataset之后再处理)来说的。

-1.增加分区数,可以增加并行度,当spark申请的cpu核心足够的情况下,可以同时跑不同分区的数据(因为一个分区的数据,只能由一个核心来跑,不能多个)

-2.手动增加,使用repartition来将所有数据打散

-3.自动增加,spark有个参数:spark.sql.shuffle.partitions,默认值为200。也就是说当触发shuffle逻辑的时候,数据会自动分为200个分区运行,但是在数据量大的情况下,每个分区的数据量太大,而且假设spark申请到了300个核心,但是因为分区数只有200,会导致只有200个核心在运行,另外100个核心在空转(虽然占用资源但是却不干活)。所以可以将该参数设置为500甚至更大,来增加分区数和并行度。

3、spark文件块在保存前如何减少?

在上一个(计算的)步骤,我们将数据增加分区,一个分区会生成一个文件块,如果没有做任何修改,并且spark.sql.shuffle.partitions参数值设置为200,那么不管这个数据多大,都会生成200个文件块。所以减少文件块的方法就是通过在数据保存之前调用repartition或者coalesce这两个API来减少或者增加文件块。

例如:

Dataset<Row> tb = spark.table("数据库.表名")
       .groupBy(col("日期"))
       .agg(countDistinct(col("id")).as("uv"),count(lit(1)).as("pv"))
       .select("日期","uv","pv");
 
//这里也可以使用coalesce来代替repartition
tb.repartition(1)
.write()
.partitionBy("日期")
.mode(SaveMode.Overwrite)
.format("hive").saveAsTable("数据库.新的表名");

讲到这里,相信大家对于hdfs存储数据文件块策略和spark的repartition、coalesce两个算子有了初步的了解,下一篇文章会进入正题:有哪些方式可以自动且合理的判断保存的文件个数?

好了,本人菜鸡一个,如果有什么说错或者说的不严谨的地方,还请大家批评指出~!

未完待续~

猜你喜欢

转载自blog.csdn.net/lsr40/article/details/84968923