三十五、Hive的优化

本文主要来讲述一下Hive的优化问题,关注专栏《破茧成蝶——大数据篇》,查看更多相关的内容~


目录

一、Hive的数据存储格式

二、Hive调优

2.1 Fetch

2.2 本地模式查询

2.3 表与表之间的Join

2.4 Map端聚合操作

2.5 去重统计

2.6 笛卡尔积

2.7 查询优化

扫描二维码关注公众号,回复: 12501647 查看本文章

2.8 开启动态分区

2.9 设置合理的Map和Reduce数量

2.10 JVM重用

2.11 strict模式

2.12 并行执行

2.13 推测执行

2.14 Explain语句


一、Hive的数据存储格式

在进行优化讲解之前,我们先来看看Hive有哪些数据的存储格式。Hive支持的存储数据的格式主要有四种,分别是:TEXTFILE 、SEQUENCEFILE、ORC和PARQUET。其中,TEXTFILE和SEQUENCEFILE的存储格式都是基于行存储的;而ORC和PARQUET是基于列式存储的。当查询满足条件的一整行数据时,列式存储则需要去每个聚集的字段找到对应的每个列的值,行式存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。当查询只需要少数几个字段的时候,列式存储能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好压缩算法。具体的压缩方式的介绍,可以参看咱们之前的《十六、Hadoop中的数据压缩》。接下来,看咱们今天的正题——Hive的调优。

二、Hive调优

2.1 Fetch

在Hive中,有些情况的查询是不用走底层MapReduce的,例如查询某个表的所有数据,即:select * from table_name;,这种情况也叫Fetch抓取。这是因为在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,如下所示:

当把该值设置为none的时候,所有的查询操作都会走底层的MapReduce。

2.2 本地模式查询

当Hive的输入数据量非常小时,可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。通过设置hive.exec.mode.local.auto的值为true,让Hive在适当的时候自动启动这个优化,默认是false。

//开启本地MapReduce
set hive.exec.mode.local.auto=true;
//设置local mapr的最大输入数据量,当输入数据量小于这个值时采用local MapReduce的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//设置local MapReduce的最大输入文件个数,当输入文件个数小于这个值时采用local MapReduce的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;

2.3 表与表之间的Join

当小表与大表join的时候,一般选择MapJoin,将小表加载到内存,避免在ReduceJoin,也就是说避免了数据倾斜。

//设置自动选择MapJoin,默认为true
set hive.auto.convert.join = true;
//大表小表的阈值设置,默认25M以下是小表
set hive.mapjoin.smalltable.filesize=25000000;

当大表与大表join的时候,有时超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。例如key对应的字段为空。有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上。

表与表之间的优化,可以参考咱们之前的《十四、MapReduce中的Join操作》《十八、Hadoop的优化》。说到底,Hive的优化其实就是Hadoop的优化,因为Hive的底层是依赖于MapReduce的。

2.4 Map端聚合操作

默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时会造成数据倾斜。其实并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。通过以下设置开启Map端聚合操作:

//是否在Map端进行聚合,默认为True
set hive.map.aggr = true;
//在Map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
//有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;

当选项设定为 true,生成的查询计划会有两个MapReduce Job。第一个MapReduce Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MapReduce Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。

2.5 去重统计

当数据量大的情况下,由于COUNT DISTINCT的全聚合操作,即使设定了reduce task个数,set mapred.reduce.tasks=100;hive也只会启动一个reducer。这就造成一个Reduce处理的数据量太大,导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换。

2.6 笛卡尔积

在join的时候尽量避免无效的on条件,因为Hive只能使用一个Reducer来完成笛卡尔积。

2.7 查询优化

查询数据时,只取需要的列少用SELECT *。当使用外关联时,将join后面表的过滤条件写在join里面,避免全表关联。

2.8 开启动态分区

Hive中提供了动态分区的功能,对分区表Insert数据的时候,会自动根据分区字段的值,将数据插入到相应的分区中,通过以下方式开启自动分区功能。

//开启动态分区功能,默认true
hive.exec.dynamic.partition=true;
//设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
hive.exec.dynamic.partition.mode=nonstrict;
//在所有执行MapReduce的节点上,最大一共可以创建多少个动态分区,默认1000
hive.exec.max.dynamic.partitions=1000;
//在每个执行MapReduce的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
hive.exec.max.dynamic.partitions.pernode=100;
//整个MapReduce Job中,最大可以创建多少个HDFS文件。默认100000
hive.exec.max.created.files=100000;
//当有空分区生成时,是否抛出异常。一般不需要设置。默认false
hive.error.on.empty.partition=false;

2.9 设置合理的Map和Reduce数量

通常情况下,job会通过input的目录产生一个或者多个map任务。map任务主要的决定因素有:input的文件总个数、input的文件大小以及集群设置的文件块大小。并不是map数越多越好,如果一个任务有很多小文件(远远小于块大小128M),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是有限的。同时,保证每个map处理接近128m的文件块也并不是就万事大吉了。例如有一个127M的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有成百上千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也会非常耗时。

所以,我们必须根据不同的场景灵活的设定合理的Map和Reduce的数量。这里列出了以下几种情况。

1、当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。增加map的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。

2、在map执行前合并小文件,减少map数。CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。通过如下设置实现在map执行前合并小文件:

set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

3、下面的命令是在MapReduce任务结束时合并小文件:

(1)在map-only任务结束时合并小文件,默认true
SET hive.merge.mapfiles = true;

(2)在map-reduce任务结束时合并小文件,默认false
SET hive.merge.mapredfiles = true;

(3)合并文件的大小,默认256M
SET hive.merge.size.per.task = 268435456;

(4)当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
SET hive.merge.smallfiles.avgsize = 16777216;

4、reduce个数并不是越多越好,过多的启动和初始化reduce也会消耗时间和资源。另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,那么又会出现小文件过多的问题。通过下面的设置调整reduce的个数:

(1)通过下面的设置修改每个Reducer处理的数据量,默认是256MB
hive.exec.reducers.bytes.per.reducer=256000000;

(2)通过下面的设置修改每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009;

(3)在hadoop的mapred-default.xml文件中修改下面的参数,设置每个job的Reduce个数
set mapreduce.job.reduces = 15;

2.10 JVM重用

JVM重用对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,需要结合具体的业务场景进行设置。这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

<property>
  <name>mapreduce.job.jvm.numtasks</name>
  <value>10</value>
  <description>How many tasks to run per jvm. If set to -1, there is
  no limit. 
  </description>
</property>

2.11 strict模式

Hive开启严格模式需要修改hive.mapred.mode值为strict,开启严格模式可以禁止3种类型的查询。

1、对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。

2、对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加这个LIMIT语句可以防止Reducer额外执行很长一段时间。

3、限制笛卡尔积的查询。在关系型数据库中执行JOIN查询的时候不使用ON语句而使用where语句,这样关系数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。但是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。

2.12 并行执行

Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某些特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。通过设置参数hive.exec.parallel值为true,就可以开启并行执行。

//打开任务并行执行
set hive.exec.parallel=true;

//同一个sql允许最大并行度,默认为8。
set hive.exec.parallel.thread.number=16;

2.13 推测执行

在分布式集群环境下,因为程序的Bug或者Hadoop本身的bug、负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务,例如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕,则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测运行缓慢的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。

在Hadoop的mapred-site.xml文件中配置mapreduce.map.speculative和mapreduce.reduce.speculative为true开启推测执行参数,该配置项默认是true。不过Hive本身也提供了配置项hive.mapred.reduce.tasks.speculative.execution来控制reduce-side的推测执行,该配置项默认是true。

2.14 Explain语句

在Hive中可以使用Explain语句来查看语句的执行计划,其语法如下:

EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query

例如:

1、查看如下语句的执行计划:

explain select * from people;

2、查看详细的执行计划

explain extended select * from people;

好了,以上就是Hive的调优内容了,本文到此已经接近尾声,你们在这个过程中遇到了什么问题,欢迎留言,让我看看你们遇到了什么问题~

猜你喜欢

转载自blog.csdn.net/gdkyxy2013/article/details/111466765