压缩
Hive 压缩的目的是:减少磁盘 IO 与网络 IO
-
配置方式
- 将 snappy 文件解压缩到 hadoop/lib/native
- 使用 hadoop checknative 检验是否可用
-
snappy 文件的处理
- 参照 hadoop 源码下的 building.txt 进行编译
- 下载 snappy 库文件 http://google.github.io/snappy/
- 安装 snappy,将相关 jar 放在 lib 库中
- 编译
常见压缩总结
- 压缩格式:bzip2, gzip, lzo, snappy
- 压缩比:bzip2 > gzip > lzo
- 压缩速度:bzip2 < gzip < lzo
压缩格式与 hive 命令
zlib -> org.apache.hadoop.io.compress.DefaulCodec
gzip -> org.apache.hadoop.io.compress.GzipCodec
Bzip2 -> org.apache.hadoop.io.compress.BZip2Codec
Lzo -> com.hadoop.compression.lzo.LzoCodec
Lz4 -> org.apache.hadoop.io.compress.Lz4Codec
Snappy -> org.apache.hadoop.io.compress.SnappyCodec
压缩方案
压缩方案
MapReduce 配置 map 端压缩:
mapreduce.map.output.compress=true
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
MapReduce 配置 Reduce 端压缩:
mapreduce.output.fileoutputformat.compress=true
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
Hive 配置压缩
hive.exec.compress.intermediate=true
mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
mapred.output.compression.type=BLOCK
任务中间压缩
hive.exec.compress.intermediate=true
hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
hive.intermediate.compression.type=BLOCK
时间处理
- 自定义 UDF 函数
- 时间函数
- 标准日期格式:2017-12-31 00:00:00
- Unixtimestamp:1970-01-01 00:00:00至今的总秒数
//需标准格式 select unix_timestamp("2017-12-31 00:00:00"); select from_unixtime(1486543750); select unix_timestamp('201712331 00:00:00','yyyyMMdd HH:mm:ss');
数据倾斜
数据倾斜造成的原因:当到达 reduce 端数据如果在某一个 key 上分布特别多的话,就会造成单个节点处理时间异常增多,从而导致整体任务消耗严重。
关键词 | 情况 | 后果 |
---|---|---|
join | 其中一个表较小,但 key 集中 | 分发到某一个或几个 reduce 上的数据远高于平均值 |
join | 大表与大表,但是分桶的判断字段 0 值或空值过多 | 这些控制都是由一个 reduce 处理,非常慢 |
group by | 维度过小,某值的数量过多 | 处理某值的 reduce 非常耗时 |
count distinct | 某特殊值过多 | 处理此特殊值的 reduce 耗时 |
-
数据倾斜的具体原因:
- key 分布不均匀
- 业务数据本身的特征
- 建表时考虑不周
- 某些 sql 语句本身就有数据倾斜
-
数据倾斜的具体表现:
任务进度长时间维持在 90%,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其吹的数据量和其他 reduce 差异过大。
单一 reduce 的记录数与平均记录数差异过大,通常可能达到 3 倍甚至更多,最长时长远大于平均时长。
方案
-
调节参数
提高 HiveQL 聚合的执行性能hive.map.aggr=true
, 在 map 中会做部分聚集操作,效率更高但需要更多的内存。数据倾斜时负载均衡,当选项hive.groupby.skewindata=true
,生成的查询计划会有两个MRJob。第一个 MRJob 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 GroupBy Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MRJob 再根据预处理的数据结果按照 GroupBy Key 分布到 Reduce 中(这个过程可以保证相同的 GroupBy Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。 -
SQL 语句调节
select * from log a where a.user_id is null;
select * from users a left outer join logs b on a.user_id=cast(b.user_id as string)
- 表的设计
优化处理
-
表拆分
大表拆分成小表,分区表,外部表,临时表分区表是在一个或多个维度上对数据进行分类存储,一个分类(分区)对应一个子目录。在这样的存储方式中,如果筛选的条件中有分类字段,那么 hive 只需要遍历相应的子目录下的文件即可,而不用全局遍历。提高查询的时间。
-
MR 优化
所谓的 MR 优化,就是针对 map 和 reduce 的操作进行优化处理。因此就涉及到了 map 优化和 reduce 优化。map 端:
hadoop 源码中有一个计算公式, 决定了 map 的个数: min(max_split_size,max(min_split_size,block_size))min_split_size默认值0(最小分片大小) block_size,block_size 默认是128 max_split_size 默认值256(最大分片大小)
一个在实际的生产环境中 HDFS 一旦 format 格式化之后,
block_size
大小不会去修改的,可以通过max_split_size
和min_split_size
来影响map的个数。reduce 端
在 reduce 端可以调整 reduce task 的个数set mapred.reduce.tasks=number
。 -
并行执行
hive sql 在底层会运行 MR job,每个 job 就是一个 stage,这些 job 是顺序执行的,但是 针对有些互相没有依赖关系的独立的 job, 可以选择并发的执行 job,这样就能提高并行度,节约运行时间。hive.exec.parallel # 参数控制在同一个sql中的不同的job是否可以同时运行 hive.exec.parallel.thread.number # 对于同一个sql来说同时可以运行的job的最大值, 该参数默认为8. 此时最大可以同时运行8个job.
一般在工作中会选择去开启该功能,需根据实际的集群的状况和服务器的性能合理的设置线程数目。
但是在系统资源匮乏时,启用并行反而会导致各 job 抢占资源而导致整体的执行性能下降,这点需要注意。
-
JVM重用
hive 的操作涉及到 MR job,每一个 MR job 是由一系列的 map task 和 reduce task 组成,每个 map 或 task task 会启动一个 JVM 进程,一旦 task 执行完毕,JVM 就会退出。如果不听的启动或关闭 JVM,JVM 的运行将是一个比较耗时的操作,因此,可以采用 JVM 重用来处理。
MR默认JVM运行, 开启 JVM 开启多个任务mapreduce.job.jvm.num.tasks=number
也就是说,每个 JVM 在执行 number 个任务后才退出,这样可以节省 JVM 的操作时间。set mapreduce.job.jvm.num.tasks = 10;
-
推测执行
当某个任务出现迟迟不结束的情况, 开启推测执行会开启一个一模一样的任务, 其一但完成关闭另一个.
分为 map 端的推测和 reduce 端的推测,但过多的消耗资源, 可能会出现重复写入的情况。mapreduce.map.speculative mapreduce.reduce.speculative
-
hive 本地模式
hive 在集群上查询时,默认是在集群上的多台机器上运行,需要集群上的机器协调运行。这对于查询大数据量是有好处的,但是当查询的数据量较小时,就没有必要启动集群方式来进行查询处理了,毕竟集群涉及到了网络,多点协调等问题,对于小数据量而言可能更耗资源。因此可以只是用本地模式来进行 hive 操作。要求:
hive.exec.mode.local.auto = false hive.exec.mode.local.auto.input.files.max => 数据大小不能超过 128 MB hive.exec.mode.local.input.files.max => map 不能超过 4 个 reduce 的个数不能超过 1 个
同时可以针对查询进行不执行 MR 来进行处理,参数:
hive.fetch.task.conversion = more
,表示在 select,where,limit 操作时,都直接进行数据抓取操作,而不涉及 MR 计算的操作,这样执行的效率更快更高。set hive.fetch.task.conversion= more; select * from student08 where age is not null;