import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.{
DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.{
SparkConf, SparkContext}
import org.apache.spark.sql.{
DataFrame, SaveMode, SparkSession}
object App {
private val MASTER = "local[2]"
val SOURCE = "hudi"
val insertData = Array[TemperatureBean](
new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"),
new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"),
new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"),
new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"),
new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42")
)
case class TemperatureBean(id:String,flag:Int,temperature:Double,time:String)
val updateData = Array[TemperatureBean](
new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 设备ID为4310的传感器发生修改,温度值由29.9->30.4,时间由12:34:42->12:35:42
)
val deleteData = Array[TemperatureBean](
new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 设备ID为4310的传感器要被删除,必须与最新的数据保持一致(如果字段值不同时无法删除)
)
def insertData(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
val createTime = System.currentTimeMillis().toString
val df: DataFrame = spark.read.text("/user/root/data/input/temperature.txt")
.mapPartitions(
partition=>{
partition.map(item=>{
val items: Array[String] = item.toString().split(",")
TemperatureBean(items(0),items(1).toInt,items(2).toDouble,items(3))
})
}
).toDF()
import org.apache.hudi.DefaultSource
val result = df.withColumn("ts",lit(createTime))
.withColumn("hudipartition",col("time").substr(0,10))
.withColumn("uuid",col("id"))
result.write.format("hudi")
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","test_table")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.mode(SaveMode.Overwrite)
.save("/user/root/hudiparquet2")
}
def upsertData(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
val createTime = System.currentTimeMillis().toString
val df: DataFrame = spark.read.text("/user/root/data/input/temperature3.txt")
.mapPartitions(
partition=>{
partition.map(item=>{
val items: Array[String] = item.toString().split(",")
TemperatureBean(items(0),items(1).toInt,items(2).toDouble,items(3))
})
}
).toDF()
import org.apache.hudi.DefaultSource
val result = df.withColumn("ts",lit(createTime))
.withColumn("hudipartition",col("time").substr(0,10))
.withColumn("uuid",col("id"))
result.write.format("hudi")
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","test_table")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.mode(SaveMode.Append)
.save("/user/root/hudiparquet2")
}
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val spark: SparkSession = SparkSession.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(conf).enableHiveSupport().getOrCreate()
val ssc: SparkContext = spark.sparkContext
ssc.hadoopConfiguration.set("fs.defaultFS","hdfs://node1:8020")
ssc.hadoopConfiguration.set("dfs.nameservices","node1")
insertData(spark)
// readData(spark)
// incrementReadData(spark)
// upsertData(spark)
// insertHiveData(spark)
// upsertHiveData(spark)
}
def readData(spark: SparkSession) = {
spark.read.format("hudi").load("/user/root/hudiparquet2/*/*").show()
}
def incrementReadData(spark: SparkSession) = {
val beginTime = "20210905081108"
spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,beginTime)
.load("/user/root/hudiparquet2")
.show()
}
def optimizeReadData(spark: SparkSession) = {
val beginTime = "20210905081108"
spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,beginTime)
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,beginTime)
.load("/user/root/hudiparquet2")
.show()
}
def insertHiveData(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
val createTime = System.currentTimeMillis().toString
val df: DataFrame = spark.read.text("/user/root/data/input/temperature.txt")
.mapPartitions(
partition=>{
partition.map(item=>{
val items: Array[String] = item.toString().split(",")
TemperatureBean(items(0),items(1).toInt,items(2).toDouble,items(3))
})
}
).toDF()
val result = df.withColumn("ts",lit(createTime))
.withColumn("hudipartition",col("time").substr(0,10))
.withColumn("uuid",col("id"))
// COPY ON WRITE
result.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","hudi_hive_test_table_cow2")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,"jdbc:hive2://node3:10000")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY,"default")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,"hudi_hive_cow2")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Overwrite)
.save("/user/root/hudihiveparquet_cow2")
// MERGE ON READ
result.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","hudi_hive_test_table_mor2")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,"jdbc:hive2://node3:10000")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY,"default")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,"hudi_hive_mor2")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Overwrite)
.save("/user/root/hudihiveparquet_mor2")
}
def upsertHiveData(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
val createTime = System.currentTimeMillis().toString
val df: DataFrame = spark.read.text("/user/root/data/input/temperature3.txt")
.mapPartitions(
partition=>{
partition.map(item=>{
val items: Array[String] = item.toString().split(",")
TemperatureBean(items(0),items(1).toInt,items(2).toDouble,items(3))
})
}
).toDF()
val result = df.withColumn("ts",lit(createTime))
.withColumn("hudipartition",col("time").substr(0,10))
.withColumn("uuid",col("id"))
// COPY ON WRITE
result.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","hudi_hive_test_table_cow2")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,"jdbc:hive2://node3:10000")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY,"default")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,"hudi_hive_cow2")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Append)
.save("/user/root/hudihiveparquet_cow2")
// MERGE ON READ
result.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","hudi_hive_test_table_mor2")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,"jdbc:hive2://node3:10000")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY,"default")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,"hudi_hive_mor2")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Append)
.save("/user/root/hudihiveparquet_mor2")
}
}
pom.xml
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.{
DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.{
SparkConf, SparkContext}
import org.apache.spark.sql.{
DataFrame, SaveMode, SparkSession}
object App {
private val MASTER = "local[2]"
val SOURCE = "hudi"
val insertData = Array[TemperatureBean](
new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"),
new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"),
new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"),
new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"),
new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42")
)
case class TemperatureBean(id:String,flag:Int,temperature:Double,time:String)
val updateData = Array[TemperatureBean](
new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 设备ID为4310的传感器发生修改,温度值由29.9->30.4,时间由12:34:42->12:35:42
)
val deleteData = Array[TemperatureBean](
new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 设备ID为4310的传感器要被删除,必须与最新的数据保持一致(如果字段值不同时无法删除)
)
def insertData(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
val createTime = System.currentTimeMillis().toString
val df: DataFrame = spark.read.text("/user/root/data/input/temperature.txt")
.mapPartitions(
partition=>{
partition.map(item=>{
val items: Array[String] = item.toString().split(",")
TemperatureBean(items(0),items(1).toInt,items(2).toDouble,items(3))
})
}
).toDF()
import org.apache.hudi.DefaultSource
val result = df.withColumn("ts",lit(createTime))
.withColumn("hudipartition",col("time").substr(0,10))
.withColumn("uuid",col("id"))
result.write.format("hudi")
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","test_table")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.mode(SaveMode.Overwrite)
.save("/user/root/hudiparquet2")
}
def upsertData(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
val createTime = System.currentTimeMillis().toString
val df: DataFrame = spark.read.text("/user/root/data/input/temperature3.txt")
.mapPartitions(
partition=>{
partition.map(item=>{
val items: Array[String] = item.toString().split(",")
TemperatureBean(items(0),items(1).toInt,items(2).toDouble,items(3))
})
}
).toDF()
import org.apache.hudi.DefaultSource
val result = df.withColumn("ts",lit(createTime))
.withColumn("hudipartition",col("time").substr(0,10))
.withColumn("uuid",col("id"))
result.write.format("hudi")
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","test_table")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.mode(SaveMode.Append)
.save("/user/root/hudiparquet2")
}
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val spark: SparkSession = SparkSession.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(conf).enableHiveSupport().getOrCreate()
val ssc: SparkContext = spark.sparkContext
ssc.hadoopConfiguration.set("fs.defaultFS","hdfs://node1:8020")
ssc.hadoopConfiguration.set("dfs.nameservices","node1")
insertData(spark)
// readData(spark)
// incrementReadData(spark)
// upsertData(spark)
// insertHiveData(spark)
// upsertHiveData(spark)
}
def readData(spark: SparkSession) = {
spark.read.format("hudi").load("/user/root/hudiparquet2/*/*").show()
}
def incrementReadData(spark: SparkSession) = {
val beginTime = "20210905081108"
spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,beginTime)
.load("/user/root/hudiparquet2")
.show()
}
def optimizeReadData(spark: SparkSession) = {
val beginTime = "20210905081108"
spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,beginTime)
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,beginTime)
.load("/user/root/hudiparquet2")
.show()
}
def insertHiveData(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
val createTime = System.currentTimeMillis().toString
val df: DataFrame = spark.read.text("/user/root/data/input/temperature.txt")
.mapPartitions(
partition=>{
partition.map(item=>{
val items: Array[String] = item.toString().split(",")
TemperatureBean(items(0),items(1).toInt,items(2).toDouble,items(3))
})
}
).toDF()
val result = df.withColumn("ts",lit(createTime))
.withColumn("hudipartition",col("time").substr(0,10))
.withColumn("uuid",col("id"))
// COPY ON WRITE
result.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","hudi_hive_test_table_cow2")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,"jdbc:hive2://node3:10000")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY,"default")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,"hudi_hive_cow2")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Overwrite)
.save("/user/root/hudihiveparquet_cow2")
// MERGE ON READ
result.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","hudi_hive_test_table_mor2")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,"jdbc:hive2://node3:10000")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY,"default")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,"hudi_hive_mor2")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Overwrite)
.save("/user/root/hudihiveparquet_mor2")
}
def upsertHiveData(spark: SparkSession): Unit = {
import org.apache.spark.sql.functions._
import spark.implicits._
val createTime = System.currentTimeMillis().toString
val df: DataFrame = spark.read.text("/user/root/data/input/temperature3.txt")
.mapPartitions(
partition=>{
partition.map(item=>{
val items: Array[String] = item.toString().split(",")
TemperatureBean(items(0),items(1).toInt,items(2).toDouble,items(3))
})
}
).toDF()
val result = df.withColumn("ts",lit(createTime))
.withColumn("hudipartition",col("time").substr(0,10))
.withColumn("uuid",col("id"))
// COPY ON WRITE
result.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","hudi_hive_test_table_cow2")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,"jdbc:hive2://node3:10000")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY,"default")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,"hudi_hive_cow2")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Append)
.save("/user/root/hudihiveparquet_cow2")
// MERGE ON READ
result.write.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option("hoodie.insert.shuffle.parallelism",12)
.option("hoodie.upsert.shuffle.parallelism",12)
.option("PRECOMBINE_FIELD_OPT_KEY","ts")
.option("RECORDKEY_FIELD_OPT_KEY","uuid")
.option("hoodie.table.name","hudi_hive_test_table_mor2")
.option("hoodie.datasource.write.partitionpath.field","hudipartition")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,"jdbc:hive2://node3:10000")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY,"default")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY,"hudi_hive_mor2")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Append)
.save("/user/root/hudihiveparquet_mor2")
}
}
读windows本地文件:
package com.asinking.ch.ods;
import com.asinking.utils.ClickHouseUtil;
import com.asinking.utils.HudiConfigUtil;
import com.asinking.utils.SparkUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class ParquetTest {
//表名、任务名
static final String HUDI_TABLE_NAME = "ods_sp_shipment_event";
//事件名
static final String EVENT_CONF_TABLE = "sp_shipment_event_conf";
//主键
static final String PRIMARY_KEY = "country_id";
//分区键
static final String PARTITION_KEY = "seller_id,event_group_id,country_code";
//hudi表在s3的路径
static final String HUDI_PATH = "file:///C:\\Users\\12505\\Desktop\\ods_sp_shipment_event_test1";
public static void main(String[] args) {
SparkSession sparkSession = SparkUtils.sparkSession(SparkUtils.sparkConf("test"));
Dataset<Row> ds = sparkSession.read()
.parquet("file:///C:\\Users\\12505\\Desktop\\fcc20210913195611.parquet");
ds.show();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmsssss");
Dataset<Row> rowDataset = ds.withColumn("ts", functions.lit(sdf.format(new Date())));
rowDataset.show();
System.out.println(ClickHouseUtil.getTableSql("logistics.ods_sp_shipment_event", ds.schema()));
// 写hudi
rowDataset.write().format("org.apache.hudi")
.options(HudiConfigUtil
.getHudiConfig(HUDI_TABLE_NAME,
"ts", PARTITION_KEY,
"ts"))
.mode(SaveMode.Append)
.save(HUDI_PATH);
}
}