概述
- 之前和大家聊Hive Streaming Sink的时候说过,可以通过指定参数
sink.partition-commit.policy.kind
,来决定在提交分区时要做的事,比如合并小文件 - 本身Hive Streaming Sink是基于FileSystem Streaming Sink,FileSystem Streaming Sink其实已经做了保护,减少小文件的产生。主要是这两个参数
sink.rolling-policy.file-size
:文件超过多大会切分sink.rolling-policy.rollover-interval
:如果文件大小一直没达到可以切分的大小,保持多久与这个文件的连接,一旦超过了这个时间,就会写到一个新文件
- 大家可能会觉得,既然做了保护了,又哪来的小文件呢?
- 因为在写入的时候,假设并行度为N,此时数据量不大,但是每个并行度都有数据的话,就会产生N个文件,每个文件都很小。这时候就会出现小文件过多的问题
- 秉着光说不干假把式的原则,这就和大家聊一下,如何通过Flink x Zeppelin来实现Hive Streaming Sink的时候,合并上一个小时的小文件
- 首先简单说一下如何实现的
- 在做完checkpoint,准备提交分区的时候,通过Http请求Zeppelin,建立一个合并小文件的任务,然后再提交分区,整个过程非常高
- Zeppelin那边接收到请求后,执行合并小文件的任务
- 其实逻辑很简单,就是发送请求,具体执行任务的逻辑不在当前那个Hive Streaming Sink里面做,主要原因是合并小文件需要时间,如果在提交分区的时候去同步执行,会影响下游读取数据的任务
- 那为啥合并的是上一个小时的分区而不是当前分区呢?
- 第一点和上面一样,如果合并当前分区,那就得阻塞着等待合并分区完成之后再提交分区,会影响下游读取
- 第二点,当前分区都没提交怎么找得到当前分区呢?
- 总结来说就是三个优点:异步、解耦、快速返回
- 下面让我们开始看看怎么搞得吧
操作
- 我是通过改源码的方式去实现的合并小文件,所以如果打算跟着我的逻辑走的话,请先下载对应版本Flink源码
- 目前有些逻辑是写死的,比如固定是合并一小时之前的分区,而且分区得是分钟级别的三层分区,这些都可以写活,目前主要是为了测试所以暂时写死
- 下面直接贴代码吧
package org.apache.flink.connectors.hive; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.table.filesystem.PartitionCommitPolicy; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.HttpURLConnection; import java.net.URL; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; public class CombineFileCommitPolicy implements PartitionCommitPolicy { private static final Logger LOG = LoggerFactory.getLogger(CombineFileCommitPolicy.class); private static final String URI_CREATE = "http://10.70.98.1:8080/api/notebook/2FDKM8CQH/paragraph/"; private static final String URI_RUN = "http://10.70.98.1:8080/api/notebook/job/2FDKM8CQH/"; @Override public void commit(Context context) throws Exception { LinkedHashMap<String, String> map = context.partitionSpec(); if (CollectionUtil.isNullOrEmpty(map)) return; String condition = getCondition(map); LOG.info("condition is {}", condition); if (StringUtils.isNullOrWhitespaceOnly(condition)) { LOG.error("condition is null !!!"); return; } StringBuffer sb = new StringBuffer(); String sql = String.format("insert overwrite `%s` select * from `%s` where %s", context.tableName(), context.tableName(), condition); String text = String.format("%%flink.bsql\n%s ;", sql); String title = String.format("combine small file"); String paragraphId = createParagraph(title, text); if (StringUtils.isNullOrWhitespaceOnly(paragraphId)) { LOG.error("paragraphId is null !!!"); return; } String status = runParagraph(paragraphId); if ("OK".equals(status)) { return; } else { LOG.error("run paragraph error !!! status is {}", status); } } private String getCondition(Map<String, String> map) throws ParseException { StringBuffer sb = new StringBuffer(); //因为知道分区是怎么样的,所以先写死,改起来也简单,先不改了 try { String dt = map.get("dt"); String hr = map.get("hr"); String mi = map.get("mi"); String datetime = String.format("%s %s:%s:00", dt, hr, mi); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = format.parse(datetime); long timestamp = date.getTime() - 1 * 3600 * 1000 ; String finalDate = format.format(timestamp); dt = finalDate.substring(0, 10); hr = finalDate.substring(11, 13); mi = finalDate.substring(14, 16); sb.append("dt = '").append(dt).append("' and ").append("hr = '").append(hr).append("' and ").append("mi = '").append(mi).append("' ;"); } catch (Exception e) { LOG.error("get condition error !!!,e is ", e); } return sb.toString(); } private String createParagraph(String title, String text) { HttpURLConnection connection = null; StringBuilder result = null; ObjectMapper objectMapper = null; String paragraphId = null; try { URL url = new URL(URI_CREATE); connection = (HttpURLConnection) url.openConnection(); connection.setDoInput(true); // 设置可输入 connection.setDoOutput(true); // 设置该连接是可以输出的 connection.setRequestMethod("POST"); // 设置请求方式 connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); objectMapper = new ObjectMapper(); Map<String, Object> data = new HashMap<String, Object>(); data.put("title", title); data.put("text", text); PrintWriter pw = new PrintWriter(new BufferedOutputStream(connection.getOutputStream())); pw.write(objectMapper.writeValueAsString(data)); pw.flush(); pw.close(); BufferedReader br = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8")); String line = null; result = new StringBuilder(); while ((line = br.readLine()) != null) { // 读取数据 result.append(line + "\n"); } LOG.info("create paragraph result is {}", result); Map resMap = objectMapper.readValue(result.toString(), HashMap.class); paragraphId = resMap.get("body").toString(); } catch (Exception e) { LOG.error("create paragraph error !!! ,e is ,", e); } finally { if (connection!=null) connection.disconnect(); } return paragraphId; } private String runParagraph(String paragraphId) { HttpURLConnection connection = null; ObjectMapper objectMapper = null; String status = null; try { URL url = new URL(URI_RUN + paragraphId); connection = (HttpURLConnection) url.openConnection(); connection.setDoInput(true); // 设置可输入 connection.setDoOutput(true); // 设置该连接是可以输出的 connection.setRequestMethod("POST"); // 设置请求方式 connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.connect(); BufferedReader br = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8")); String line = null; StringBuilder result = new StringBuilder(); while ((line = br.readLine()) != null) { // 读取数据 result.append(line + "\n"); } LOG.info("run paragraph result is {}", result); objectMapper = new ObjectMapper(); Map resMap = objectMapper.readValue(result.toString(), HashMap.class); status = resMap.get("status").toString(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection!=null) connection.disconnect(); } return status; } public static void main(String[] args) throws Exception { } }
- 逻辑很简单,通过当前要提交的分区获取一小时之前的分区,然后组装成Sql提交到Zeppelin上新建个任务,再把刚才新建的任务给Run起来
- 这里面用到了Zeppelin的REST API,具体可以参考Zeppelin官网的文档REST API
- 要注意一点的是,执行Stream任务和Batch任务的Flink集群不能是同一个集群,否则会报错,因为Batch任务不支持checkpoint,所以建议使用Yarn模式+Notebook per note模式,这样每个Notebook会对应一个Flink集群,可以通过Yarn的特点来快速拉起一个集群
- 还有一点,Batch任务的Notebook需要先执行一下这个代码
%flink btenv.getConfig().getConfiguration().setBoolean("table.exec.hive.infer-source-parallelism",false);
- 这个参数
table.exec.hive.infer-source-parallelism
的意思是让读Hive表的时候,Source的并行度不跟着Hive文件个数走;为啥加这个参数呢?因为如果不加的话,还是会有多个并行度去写出文件,假设读两个文件再写出两个文件,那么就起不到合并小文件的作用了。可以参考Source Parallelism Inference - 然后我们打包,到
${FLINK_SRC}/flink-connectors/flink-connector-hive
这个目录下mvn clean install -Dcheckstyle.skip=true -Dmaven.test.skip=true -Drat.skip=true -Pscala-2.11
,然后用target
目录下的flink-connector-hive_2.11-1.11.0.jar
覆盖{FLINK_HOME}/lib
下的同名jar包 - 如果上面的步骤没啥问题的话,下面开始我们的验证阶段
验证
-
启动一个Hive Streaming Sink的任务
%flink.ssql(parallelism=2) SET table.dynamic-table-options.enabled=TRUE; insert into hive_table2 select user_id,theme_id,item_id,leaf_cate_id,cate_level1_id,clk_cnt,reach_time,DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') ,DATE_FORMAT(ts, 'mm') from kafka_table /*+ OPTIONS('scan.startup.mode' = 'earliest-offset' )*/
-
记得配置一下checkpoint,否则任务一直会是In-progress状态,无法提交分区
%flink.conf # checkpoint 配置 pipeline.time-characteristic EventTime execution.checkpointing.interval 120000 execution.checkpointing.min-pause 60000 execution.checkpointing.timeout 60000 execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION # 依赖jar包配置 flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.11.0,org.apache.flink:flink-connector-kafka-base_2.11:1.11.0 flink.execution.jars /Users/dijie/apps/hadoop-2.7.5/share/hadoop/common/lib/hadoop-lzo.jar
-
任务启动之后让我们看一下HDFS上的文件
-
因为我设置了两个并行度,所以写出了两个小文件
-
我这边设置的是合并一小时之前的分区,所以,先休息会儿,喝一杯 95 年的 Java☕️ 。
-
等时间到了之后,我们再瞄一眼HDFS上的文件
-
可以看到,小文件成功被合并了!
-
再让我们看一眼Zeppelin上的合并任务吧!
-
哦?还发现个自己代码的Bug,多了一个分号,还好Zeppelin这边本身就有容错~留个坑,我找到问题在哪了,大家可以自己修改一下
最后
- 最近被公众号
Flink 中文社区
约稿了,我写完这篇的时候还没发布,等发布了大家有兴趣可以看看,内容先神秘一下,留着大家自己一探究竟 - 然后大家也可以不必修改源码,可以自己搞个jar包,然后通过Zeppelin加载也行
- 不想自己编译也不想写代码的同学可以在这里下载到Flink 1.11.0版本的我改过源码的Flink-connector-hive的jar包(一直审核中,如果有需要可以留邮箱我给你发)
最后,向大家宣传一下Flink on Zeppelin 的钉钉群,大家有问题可以在里面讨论,简锋大佬也在里面,有问题直接提问就好(一群已满,请加二群)