应用一:kafka数据同步到kudu
1 准备kafka topic
# bin/kafka-topics.sh --zookeeper $zk:2181/kafka -create --topic test_sync --partitions 2 --replication-factor 2 WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "test_sync". # bin/kafka-topics.sh --zookeeper $zk:2181/kafka -describe --topic test_sync Topic:test_sync PartitionCount:2 ReplicationFactor:2 Configs: Topic: test_sync Partition: 0 Leader: 112 Replicas: 112,111 Isr: 112,111 Topic: test_sync Partition: 1 Leader: 110 Replicas: 110,112 Isr: 110,112
2 准备kudu表
impala-shell
CREATE TABLE test.test_sync ( id int, name string, description string, create_time timestamp, update_time timestamp, primary key (id) ) PARTITION BY HASH (id) PARTITIONS 4 STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='$kudu_master:7051');
3 准备flume kudu支持
3.1 下载jar
# wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/kudu/kudu-flume-sink/1.7.0-cdh5.16.1/kudu-flume-sink-1.7.0-cdh5.16.1.jar # mv kudu-flume-sink-1.7.0-cdh5.16.1.jar $FLUME_HOME/lib/ # wget http://central.maven.org/maven2/org/json/json/20160810/json-20160810.jar # mv json-20160810.jar $FLUME_HOME/lib/
3.2 开发
kudu-flume-sink不支持json格式数据,如果要支持需要二次开发
https://github.com/apache/kudu/tree/master/java/kudu-flume-sink
package com.cloudera.kudu; public class JsonKuduOperationsProducer implements KuduOperationsProducer {
以下代码有几个缺点,就是不允许null,并且所有的值必须是string,然后根据kudu中字段类型进行解析,在生成数据时需要注意,否则需要自行修改代码;
借用他人的代码:
JsonKuduOperationsProducer.java
package com.cloudera.kudu; import com.google.common.collect.Lists; import com.cloudera.utils.JsonStr2Map; import com.google.common.base.Preconditions; import org.json.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; import org.apache.kudu.client.PartialRow; import org.apache.kudu.flume.sink.KuduOperationsProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.util.List; import java.util.Map; /** * package: com.cloudera.kudu * describe: 自定义的KuduSink用于解析JSON格式数据 * creat_user: Fayson * email: [email protected] * creat_date: 2018/6/2 * creat_time: 下午11:07 * 公众号:Hadoop实操 */ @InterfaceAudience.Public @InterfaceStability.Evolving public class JsonKuduOperationsProducer implements KuduOperationsProducer { private static final Logger logger = LoggerFactory.getLogger(JsonKuduOperationsProducer.class); private static final String INSERT = "insert"; private static final String UPSERT = "upsert"; private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT); public static final String ENCODING_PROP = "encoding"; public static final String DEFAULT_ENCODING = "utf-8"; public static final String OPERATION_PROP = "operation"; public static final String DEFAULT_OPERATION = UPSERT; public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn"; public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false; public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue"; public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false; public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows"; public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true; private KuduTable table; private Charset charset; private String operation; private boolean skipMissingColumn; private boolean skipBadColumnValue; private boolean warnUnmatchedRows; public JsonKuduOperationsProducer() { } @Override public void configure(Context context) { String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING); try { charset = Charset.forName(charsetName); } catch (IllegalArgumentException e) { throw new FlumeException( String.format("Invalid or unsupported charset %s", charsetName), e); } operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase(); Preconditions.checkArgument( validOperations.contains(operation), "Unrecognized operation '%s'", operation); skipMissingColumn = context.getBoolean(SKIP_MISSING_COLUMN_PROP, DEFAULT_SKIP_MISSING_COLUMN); skipBadColumnValue = context.getBoolean(SKIP_BAD_COLUMN_VALUE_PROP, DEFAULT_SKIP_BAD_COLUMN_VALUE); warnUnmatchedRows = context.getBoolean(WARN_UNMATCHED_ROWS_PROP, DEFAULT_WARN_UNMATCHED_ROWS); } @Override public void initialize(KuduTable table) { this.table = table; } @Override public List<Operation> getOperations(Event event) throws FlumeException { String raw = new String(event.getBody(), charset); logger.debug("get raw:" + raw); logger.error("get raw:" + raw); Map<String, String> rawMap = JsonStr2Map.jsonStr2Map(raw); Schema schema = table.getSchema(); List<Operation> ops = Lists.newArrayList(); if(raw != null && !raw.isEmpty()) { Operation op; switch (operation) { case UPSERT: op = table.newUpsert(); break; case INSERT: op = table.newInsert(); break; default: throw new FlumeException( String.format("Unrecognized operation type '%s' in getOperations(): " + "this should never happen!", operation)); } PartialRow row = op.getRow(); for (ColumnSchema col : schema.getColumns()) { try { coerceAndSet(rawMap.get(col.getName()), col.getName(), col.getType(), row); } catch (NumberFormatException e) { String msg = String.format( "Raw value '%s' couldn't be parsed to type %s for column '%s'", raw, col.getType(), col.getName()); logOrThrow(skipBadColumnValue, msg, e); } catch (IllegalArgumentException e) { String msg = String.format( "Column '%s' has no matching group in '%s'", col.getName(), raw); logOrThrow(skipMissingColumn, msg, e); } catch (Exception e) { throw new FlumeException("Failed to create Kudu operation", e); } } ops.add(op); } return ops; } /** * Coerces the string `rawVal` to the type `type` and sets the resulting * value for column `colName` in `row`. * * @param rawVal the raw string column value * @param colName the name of the column * @param type the Kudu type to convert `rawVal` to * @param row the row to set the value in * @throws NumberFormatException if `rawVal` cannot be cast as `type`. */ private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row) throws NumberFormatException { switch (type) { case INT8: row.addByte(colName, Byte.parseByte(rawVal)); break; case INT16: row.addShort(colName, Short.parseShort(rawVal)); break; case INT32: row.addInt(colName, Integer.parseInt(rawVal)); break; case INT64: row.addLong(colName, Long.parseLong(rawVal)); break; case BINARY: row.addBinary(colName, rawVal.getBytes(charset)); break; case STRING: row.addString(colName, rawVal==null?"":rawVal); break; case BOOL: row.addBoolean(colName, Boolean.parseBoolean(rawVal)); break; case FLOAT: row.addFloat(colName, Float.parseFloat(rawVal)); break; case DOUBLE: row.addDouble(colName, Double.parseDouble(rawVal)); break; case UNIXTIME_MICROS: row.addLong(colName, Long.parseLong(rawVal)); break; default: logger.warn("got unknown type {} for column '{}'-- ignoring this column", type, colName); } } private void logOrThrow(boolean log, String msg, Exception e) throws FlumeException { if (log) { logger.warn(msg, e); } else { throw new FlumeException(msg, e); } } @Override public void close() { } }
JsonStr2Map.java
package com.cloudera.utils; import org.json.JSONObject; import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * package: com.cloudera.utils * describe: Json字符串转Map * creat_user: Fayson * email: [email protected] * creat_date: 2018/6/2 * creat_time: 下午11:19 * 公众号:Hadoop实操 */ public class JsonStr2Map { /** * 将Json字符串转为Map对象 * @param jsonStr * @return */ public static Map<String, String> jsonStr2Map(String jsonStr) { Map<String, String> map = new HashMap<String, String>(); JSONObject json = new JSONObject(jsonStr); Object aObj ; for(String k : json.keySet()){ try{ aObj = json.get(k); if(aObj instanceof Double){ map.put(k, "" + json.getDouble(k)); }else{ map.put(k, json.getString(k)); } }catch(Exception e){ } } return map; } }
详见:https://cloud.tencent.com/developer/article/1158194
打包放到$FLUME_HOME/lib下
4 准备flume conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = 192.168.0.1:9092 a1.sources.r1.kafka.topics = test_sync a1.sources.r1.kafka.consumer.group.id = flume-consumer # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.kudu.flume.sink.KuduSink a1.sinks.k1.producer = com.cloudera.kudu.JsonKuduOperationsProducer a1.sinks.k1.masterAddresses = 192.168.0.1:7051 a1.sinks.k1.tableName = impala::test.test_sync a1.sinks.k1.batchSize = 50
5 启动flume
bin/flume-ng agent --conf conf --conf-file conf/order.properties --name a1
6 kudu确认
impala-shell
select * from test_sync limit 10;
参考:https://kudu.apache.org/2016/08/31/intro-flume-kudu-sink.html