1. 引入
Hudi支持以下存储数据的视图
- 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能
- 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。
- 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。
使用Hudi自带的DeltaStreamer工具写数据到Hudi,
开启--enable-hive-sync 即可同步数据到hive表。
写操作
在此之前,了解Hudi数据源及delta streamer工具提供的三种不同的写操作以及如何最佳利用它们可能会有所帮助。
这些操作可以在针对数据集发出的每个提交/增量提交中进行选择/更改。
- UPSERT(插入更新) :这是默认操作,在该操作中,通过查找索引,首先将输入记录标记为插入或更新。
在运行启发式方法以确定如何最好地将这些记录放到存储上,如优化文件大小之类后,这些记录最终会被写入。
对于诸如数据库更改捕获之类的用例,建议该操作,因为输入几乎肯定包含更新。 - INSERT(插入) :就使用启发式方法确定文件大小而言,此操作与插入更新(UPSERT)非常相似,但此操作完全跳过了索引查找步骤。
因此,对于日志重复数据删除等用例(结合下面提到的过滤重复项的选项),它可以比插入更新快得多。
插入也适用于这种用例,这种情况数据集可以允许重复项,但只需要Hudi的事务写/增量提取/存储管理功能。 - BULK_INSERT(批插入) :插入更新和插入操作都将输入记录保存在内存中,以加快存储优化启发式计算的速度(以及其它未提及的方面)。
所以对Hudi数据集进行初始加载/引导时这两种操作会很低效。批量插入提供与插入相同的语义,但同时实现了基于排序的数据写入算法,
该算法可以很好地扩展数百TB的初始负载。但是,相比于插入和插入更新能保证文件大小,批插入在调整文件大小上只能尽力而为。
DeltaStreamer
HoodieDeltaStreamer
实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。
- 从Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件
增量导入 - 支持json、avro或自定义记录类型的传入数据
- 管理检查点,回滚和恢复
- 利用DFS或Confluent schema注册表的Avro模式。
- 支持自定义转换操作
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar` \
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--target-base-path file:///tmp/hudi-deltastreamer-op --target-table uber.impressions \
--op BULK_INSERT
Datasource Writer
hudi-spark
模块提供了DataSource API,可以将任何数据帧写入(也可以读取)到Hudi数据集中。
以下是在指定需要使用的字段名称的之后,如何插入更新数据帧的方法,这些字段包括recordKey => _row_key
、partitionPath => partition
和precombineKey => timestamp
inputDF.write()
.format("org.apache.hudi")
.options(clientOpts) // 可以传入任何Hudi客户端参数
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.mode(SaveMode.Append)
.save(basePath);
与Hive同步
上面的两个工具都支持将数据集的最新模式同步到Hive Metastore,以便查询新的列和分区。
如果需要从命令行或在独立的JVM中运行它,Hudi提供了一个HiveSyncTool
,
在构建了hudi-hive模块之后,可以按以下方式调用它。
cd hudi-hive
./run_sync_tool.sh
[hudi-hive]$ ./run_sync_tool.sh --help
Usage: <main class> [options]
Options:
* --base-path
Basepath of Hudi dataset to sync
* --database
name of the target database in Hive
--help, -h
Default: false
* --jdbc-url
Hive jdbc connect url
* --pass
Hive password
* --table
name of the target table in Hive
* --user
Hive username
删除数据
通过允许用户指定不同的数据记录负载实现,Hudi支持对存储在Hudi数据集中的数据执行两种类型的删除。
- Soft Deletes(软删除) :使用软删除时,用户希望保留键,但仅使所有其他字段的值都为空。
通过确保适当的字段在数据集模式中可以为空,并在将这些字段设置为null之后直接向数据集插入更新这些记录,即可轻松实现这一点。 - Hard Deletes(硬删除) :这种更强形式的删除是从数据集中彻底删除记录在存储上的任何痕迹。
这可以通过触发一个带有自定义负载实现的插入更新来实现,这种实现可以使用总是返回Optional.Empty作为组合值的DataSource或DeltaStreamer。
Hudi附带了一个内置的org.apache.hudi.EmptyHoodieRecordPayload
类,它就是实现了这一功能。
deleteDF // 仅包含要删除的记录的数据帧
.write().format("org.apache.hudi")
.option(...) // 根据设置需要添加HUDI参数,例如记录键、分区路径和其他参数
// 指定record_key,partition_key,precombine_fieldkey和常规参数
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
2.1 DeltaStreamer启动命令
spark-submit --master yarn \
--driver-memory 1G \
--num-executors 2 \
--executor-memory 1G \
--executor-cores 4 \
--deploy-mode cluster \
--conf spark.yarn.executor.memoryOverhead=512 \
--conf spark.yarn.driver.memoryOverhead=512 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \
--props hdfs://../kafka.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--target-base-path hdfs://../business \
--op UPSERT \
--target-table business \ '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'
--enable-hive-sync \ '开启同步至hive'
--table-type MERGE_ON_READ \
--source-ordering-field create_time \
--source-limit 5000000
2.2 kafka.properties配置实例
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=create_time
hoodie.datasource.write.precombine.field=update_time
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc
hoodie.deltastreamer.source.kafka.topic=t_business_topic
group.id=t_business_group
bootstrap.servers=localhost
auto.offset.reset=latest
hoodie.parquet.max.file.size=134217728
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.database=dwd
hoodie.datasource.hive_sync.table=test
hoodie.datasource.hive_sync.username=用户名
hoodie.datasource.hive_sync.password=密码
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....
hoodie.datasource.hive_sync.partition_fields=分区字段
3.1.1 使用Spark查询
spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false '在进行快照视图查询的时候需要添加此配置'
#快照视图
spark.sql("select count(*) from dwd.test_rt").show()
#读优化视图
spark.sql("select count(*) from dwd.test_ro").show()
#增量视图
saprk sql不支持
3.1.2 使用Hive查询
beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx -p t3cx \
--hiveconf hive.stats.autogather=false \
#读优化查询
select * from dwd.test_ro;
#快照查询
select * from dwd.test_rt;
#增量查询
set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hoodie.test.consume.mode=INCREMENTAL;
set hoodie.test.consume.max.commits=3;
set hoodie.test.consume.start.timestamp=20200427114546;
select count(*) from dwd.test_rt where `_hoodie_commit_time` > '20200427114546';
#注意:
#1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,clouder用户需要必须要重新安装mr所需要的jar
#2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat
最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定
4. 总结
DeltaStreamer是Hudi提供的非常实用的工具,通过DeltaStreamer可以将Kafka、DFS上的数据导入Hudi,而本篇博文主要讲解了如何使用DeltaStreamer将数据从Kafka导入Hudi,并演示了如何使用Spark和Hive查询Hudi数据。
三、使用Aapche Hudi整体思路
Hudi 提供了Hudi 表的概念,这些表支持CRUD操作。我们可以基于这个特点,将Mysql Binlog的数据重放至Hudi表,然后基于Hive对Hudi表进行查询分析。数据流向架构如下
binlog数据写入Hudi表
- binlog-consumer分支使用Spark streaming消费kafka中的Binlog数据,并写入Hudi表。Kafka中的binlog是通过阿里的Canal工具同步拉取的。程序入口是CanalKafkaImport2Hudi,它提供了一系列参数,配置程序的执行行为
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit
--class com.niceshot.hudi.CanalKafkaImport2Hudi \
--name hudi__goods \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 40 \
--queue hudi \
--conf spark.executor.memoryOverhead=2048 \
--conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.locality.wait=100 \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.receiver.maxRate=500 \
--conf spark.streaming.kafka.maxRatePerPartition=200 \
--conf spark.ui.retainedJobs=10 \
--conf spark.ui.retainedStages=10 \
--conf spark.ui.retainedTasks=10 \
--conf spark.worker.ui.retainedExecutors=10 \
--conf spark.worker.ui.retainedDrivers=10 \
--conf spark.sql.ui.retainedExecutions=10 \
--conf spark.yarn.submit.waitAppCompletion=false \
--conf spark.yarn.maxAppAttempts=4 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=20 \
--conf spark.yarn.executor.failuresValidityInterval=1h \
--conf spark.task.maxFailures=8 \
/data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200
历史数据同步以及表元数据同步至hive
history_import_and_meta_sync
分支提供了将历史数据同步至hudi表,以及将hudi表数据结构同步至hive meta的操作
同步历史数据至hudi表
这里采用的思路是
- 将mysql全量数据通过注入sqoop等工具,导入到hive表。
- 然后采用分支代码中的工具HiveImport2HudiConfig,将数据导入Hudi表
HiveImport2HudiConfig提供了如下一些参数,用于配置程序执行行为
一个程序执行demo
nohup java -jar hudi-learn-1.0-SNAPSHOT.jar
--sync-hive-db-name hudi_temp
--sync-hive-table-name crm__wx_user_info
--base-save-path hdfs://192.168.2.2:8020/hudi_table/
--mapping-mysql-db-name crm
--mapping-mysql-table-name "order"
--primary-key "id"
--partition-key created_date
--hive-site-path /etc/lib/hive/conf/hive-site.xml
--tmp-data-path /data/tmp > order.log &
同步hudi表结构至hive meta
需要将hudi的数据结构和分区,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi数据,并通过sql进行查询分析。Hudi本身在消费Binlog进行存储时,可以顺带将相关表元数据信息同步至hive。但考虑到每条写入Apache Hudi表的数据,都要读写Hive Meta ,对Hive的性能可能影响很大。所以我单独开发了HiveMetaSyncConfig工具,用于同步hudi表元数据至Hive。考虑到目前程序只支持按天分区,所以同步工具可以一天执行一次即可。参数配置如下
参数名 | 含义 | 是否必填 | 默认值 |
-hive-db-name | 指定hudi表同步至哪个hive数据库 | 是 | 无 |
-hive-table-name | 指定hudi表同步至哪个hive表 | 是 | 无 |
-hive-jdbc-url | 指定hive meta的jdbc链接地址,例如jdbc:hive2://192.168.16.181:10000 | 是 | 无 |
-hive-user-name | 指定hive meta的链接用户名 | 否 | 默认hive |
-hive-pwd | 指定hive meta的链接密码 | 否 | 默认hive |
-hudi-table-path | 指定hudi表所在hdfs的文件路径 | 是 | 无 |
-hive-site-path | 指定hive的hive-site.xml路径 | 是 | 无 |
一个程序执行demo
java -jar hudi-learn-1.0-SNAPSHOT.jar
--hive-db-name streaming
--hive-table-name crm__order
--hive-user-name hive
--hive-pwd hive
--hive-jdbc-url jdbc:hive2://192.168.16.181:10000
--hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order
--hive-site-path /lib/hive/conf/hive-site.xml
一些踩坑、hive相关配置
有些hive集群的hive.input.format配置,默认是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,这会导致挂载Hudi数据的Hive外表读取到所有Hudi的Parquet数据,从而导致最终的读取结果重复。需要将hive的format改为org.apache.hadoop.hive.ql.io.HiveInputFormat
,为了避免在整个集群层面上更改对其余离线Hive Sql造成不必要的影响,建议只对当前hive session设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
实战 | 将Kafka流式数据摄取至Hudi
一. 启动kafka生产者造数据
bin/kafka-console-producer.sh --broker-list node1:9092 --topic t_kafka_03
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:13"}
注意:在任务跑起来以后,在不间断的发送一部分数据,测试
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:13"}
二. 使用Flink connector kafka 映射kafka topic
1.建表语句:
CREATE TABLE hudi_source(
user_id STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
)WITH(
'connector' = 'kafka',
'topic' = 't_kafka_03',
'properties.bootstrap.servers' = 'node1:9092',
'scan.startup.mode'='earliest-offset',
'properties.group.id' = 'testGroup',
'format' = 'json'
);
2.查询语句,看一下表中的数据有没有映射过来
select * from hudi_source;
三. 使用 hudi connector 创建hudi表
CREATE TABLE hudi_source_hudi(
user_id STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/hudi_source_hudi',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'write.precombine.field' = 'log_ts',
'hoodie.datasource.write.recordkey.field' = 'user_id',
'compaction.async.enabled' = 'false'
);
注意:
1.'write.precombine.field' = 'log_ts',
这个参数必须设置,可以将对同一条数据的多次记录压缩为1条记录.
2.'hoodie.datasource.write.recordkey.field' = 'user_id',
和直接设置主键的作用的一样的
四. 将kafka表的数据写入到hudi表中
insert into hudi_source_hudi select * from hudi_source;
五. 查询hudi表数据
select * from hudi_source_hudi;
看到如下的数据,说明数据已经同步过来了.
六. 在kafka的producer的发送数据,然后在客户端进行hudi表数据查询,发现数据都过来了.
bin/kafka-console-producer.sh --broker-list node1:9092 --topic t_kafka_03
{"user_id":"a2222","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a2222","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a2222","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a2222","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a2222","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a2222","order_amount":15.0,"log_ts":"2020-11-26 12:12:13"}
看到如下的数据,说明增量的数据已经同步过来了.
Flink sql 实现 将kafka 数据写入 Hudi
七. 在FlinkSQL客户端直接进行表关联
7.1 启动kafka生产者,生产数据
1.1 启动user生产者,生产数据
bin/kafka-console-producer.sh --broker-list node1:9092 --topic user
{"user_id":"a0001","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a0002","order_amount":12.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a0003","order_amount":13.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a0004","order_amount":14.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a0005","order_amount":15.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a0006","order_amount":16.0,"log_ts":"2020-11-26 12:12:13"}
1.2 启动user_hobby生产者,生产数据
bin/kafka-console-producer.sh --broker-list node1:9092 --topic user_hobby
{"user_id":"a0001","name":"yangge","hobby":"足球"}
{"user_id":"a0002","name":"baba","hobby":"电影"}
{"user_id":"a0003","name":"mama","hobby":"游戏"}
{"user_id":"a0004","name":"dudu","hobby":"动画片"}
{"user_id":"a0005","name":"gege","hobby":"手机"}
{"user_id":"a0006","name":"jiejie","hobby":"睡觉"}
7.2 在Flink SQL客户端创建对应的映射表
2.1 在Flink SQL客户端创建user表
CREATE TABLE user_ODS(
user_id STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
)WITH(
'connector' = 'kafka',
'topic' = 'user',
'properties.bootstrap.servers' = 'node1:9092',
'scan.startup.mode'='earliest-offset',
'properties.group.id' = 'testGroup',
'format' = 'json'
);
select * from user_ODS;
2.2 在flink SQL客户端创建user_hobby表
CREATE TABLE user_hobby_ODS(
user_id STRING,
name STRING,
hobby STRING
)WITH(
'connector' = 'kafka',
'topic' = 'user_hobby',
'properties.bootstrap.servers' = 'node1:9092',
'scan.startup.mode'='earliest-offset',
'properties.group.id' = 'testGroup',
'format' = 'json'
);
select * from user_hobby_ODS;
7.3 使用 hudi connector 创建hudi表
3.1 使用 hudi connector 创建hudi表
CREATE TABLE hudi_user(
user_id STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/hudi_user',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'write.precombine.field' = 'log_ts',
'hoodie.datasource.write.recordkey.field' = 'user_id',
'compaction.async.enabled' = 'false'
);
insert into hudi_user select * from user_ODS;
select * from hudi_user ;
select * from user_ODS;
3.2 使用 hudi connector 创建hudi表
CREATE TABLE hudi_user_hobby(
user_id STRING,
name STRING,
hobby STRING
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/hudi_user_hobby',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'user_id',
'write.precombine.field' = 'user_id',
'compaction.async.enabled' = 'false'
);
insert into hudi_user_hobby select * from user_hobby_ODS;
select * from hudi_user_hobby;
7.4 使用 hudi connector 创建hudi DWD表
4.1 在Flink SQL 创建DWD输出表
CREATE TABLE user_hobby_DWD(
user_id STRING,
name STRING,
hobby STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
)WITH(
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/user_hobby_DWD',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'user_id',
'write.precombine.field' = 'user_id',
'compaction.async.enabled' = 'false'
);
insert into user_hobby_DWD
select
A.user_id,
B.name,
B.hobby,
A.order_amount,
A.log_ts
from
(select * from hudi_user) A
join
(select * from hudi_user_hobby) B
on A.user_id = B.user_id;
注意事项:字段的顺序和最终写入表的字段顺序必须一致,不一致会报错.
出现这样的结果,说明join完成.