版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/weixin_39478115/article/details/83182578
课程链接: http://edu.51cto.com/course/15174.html
package src.main.java.org.kududb.examples.sample;
import com.google.common.collect.ImmutableList;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* 数据刷新策略对比
*/
public class InsertFlushData {
// 缓冲大小,也就是数据的条数
private final static int OPERATION_BATCH = 2000;
/**
* mode形式:
* SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
* SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
* SessionConfiguration.FlushMode.MANUAL_FLUSH
*/
// 支持三个模式的测试用例
public static void insertTestGeneric(KuduSession session, KuduTable table, SessionConfiguration.FlushMode mode, int recordCount) throws Exception {
session.setFlushMode(mode);
if (SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC != mode) {
session.setMutationBufferSpace(OPERATION_BATCH);
}
int commit = 0;
for (int i = 0; i < recordCount; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
UUID uuid = UUID.randomUUID();
row.addString("id", uuid.toString());
row.addInt("value1", 16);
row.addLong("value2", 16);
Long gtmMillis;
Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
gtmMillis = localTimestamp.getTime();
Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
row.addLong("timestamp", shanghaiTimezoneMillis);
session.apply(insert);
if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode) {
commit = commit + 1;
if (commit > OPERATION_BATCH / 2) {
session.flush();
commit = 0;
}
}
}
// 对于手工提交, 保证完成最后的提交
if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode && commit > 0) {
session.flush();
}
// 对于后台自动提交, 必须保证完成最后的提交, 并保证有错误时能抛出异常
if (SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND == mode) {
session.flush();
RowErrorsAndOverflowStatus error = session.getPendingErrors();
// 检查错误收集器是否有溢出和是否有行错误
if (error.isOverflowed() || error.getRowErrors().length > 0) {
if (error.isOverflowed()) {
throw new Exception("kudu overflow exception occurred.");
}
StringBuilder errorMessage = new StringBuilder();
if (error.getRowErrors().length > 0) {
for (RowError errorObj : error.getRowErrors()) {
errorMessage.append(errorObj.toString());
errorMessage.append(";");
}
}
throw new Exception(errorMessage.toString());
}
}
}
// 支持手动flush的测试用例
public static void insertTestManualFlush(KuduSession session, KuduTable table, int recordCount) throws Exception {
SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
session.setFlushMode(mode);
session.setMutationBufferSpace(OPERATION_BATCH);
int commit = 0;
for (int i = 0; i < recordCount; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
UUID uuid = UUID.randomUUID();
row.addString("id", uuid.toString());
row.addInt("value1", 17);
row.addLong("value2", 17);
Long gtmMillis;
Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
gtmMillis = localTimestamp.getTime();
Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
row.addLong("timestamp", shanghaiTimezoneMillis);
session.apply(insert);
// 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
commit = commit + 1;
if (commit > OPERATION_BATCH / 2) {
session.flush();
commit = 0;
}
}
// 对于手工提交, 保证完成最后的提交
if (commit > 0) {
session.flush();
}
}
// 自动flush的测试案例
public static void insertTestAutoFlushSync(KuduSession session, KuduTable table, int recordCount) throws Exception {
SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
session.setFlushMode(mode);
for (int i = 0; i < recordCount; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
UUID uuid = UUID.randomUUID();
row.addString("id", uuid.toString());
row.addInt("value1", 18);
row.addLong("value2", 18);
Long gtmMillis;
/**
* System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 所以需要时间*1000
* 另外, 考虑到我们是东8区时间, 所以转成Long型需要再加8个小时, 否则存到Kudu的时间是GTM, 比东8区晚8个小时
*/
// 第一步: 获取当前时间对应的GTM时区unix毫秒数
gtmMillis = System.currentTimeMillis();
// 第二步: 将timestamp转成对应的GTM时区unix毫秒数
Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
gtmMillis = localTimestamp.getTime();
// 将GTM的毫秒数转成东8区的毫秒数量
Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
row.addLong("timestamp", shanghaiTimezoneMillis);
// 对于AUTO_FLUSH_SYNC模式, apply()将立即完成数据写入,但是不是批处理
session.apply(insert);
}
}
/**
* 测试案例
*/
public static void testStrategy() throws KuduException {
KuduClient client = new KuduClient.KuduClientBuilder("hadoop01").build();
KuduSession session = client.newSession();
KuduTable table = client.openTable("bigData2");
SessionConfiguration.FlushMode mode;
long d1;
long d2;
long timeMillis;
long seconds;
int recordCount = 200000;
try {
// 自动刷新策略(默认的刷新策略)
mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
System.out.println(mode + " is start!");
d1 = System.currentTimeMillis();
insertTestAutoFlushSync(session, table, recordCount);
d2 = System.currentTimeMillis();
timeMillis = d2 - d1;
System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
// 后台刷新策略
mode = SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
System.out.println(mode + " is start!");
d1 = System.currentTimeMillis();
insertTestGeneric(session, table, mode, recordCount);
d2 = System.currentTimeMillis();
timeMillis = d2 - d1;
System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
// 手动刷新
mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
System.out.println(mode + " is start!");
d1 = System.currentTimeMillis();
insertTestManualFlush(session, table, recordCount);
d2 = System.currentTimeMillis();
timeMillis = d2 - d1;
System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (!session.isClosed()) {
session.close();
}
}
}
public static void createTable() {
String tableName = "bigData2";
KuduClient client = new KuduClient.KuduClientBuilder("hadoop01").defaultAdminOperationTimeoutMs(60000).build();
KuduSession session = client.newSession();
session.setTimeoutMillis(60000);
try {
// 测试,如果table存在的情况下,就删除该表
if (client.tableExists(tableName)) {
client.deleteTable(tableName);
System.out.println("delete the table is success!");
}
List<ColumnSchema> columns = new ArrayList();
// 创建列
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value1", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value2", Type.INT64).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.INT64).key(true).build());
// 创建schema
Schema schema = new Schema(columns);
// id相当于联合主键
ImmutableList<String> hashKeys = ImmutableList.of("id", "timestamp");
CreateTableOptions tableOptions = new CreateTableOptions();
// 设置hash分区,包括分区数量、副本数目
tableOptions.addHashPartitions(hashKeys, 20);
tableOptions.setNumReplicas(1);
System.out.println("create the table is success! ");
// 创建table,并设置partition
client.createTable(tableName, schema, tableOptions);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
createTable();
testStrategy();
} catch (KuduException e) {
e.printStackTrace();
}
}
}