ORCfile Sink开发中遇到的坑

最近做了一个基于hdfs 的sink,用于写hdfs orc文件,中间遇到了几个坑,下面把思路和遇到的问题一一记录下来。

1. 开发思路
首先的实现场景是这样的:从channel拿数据-->sink拿到数据后做分类-->分类后将数据写入对应的orc文件->文件关闭。技术要点是这样:

1.1  线程池管理hdfs操作
在分类写orc文件这环节,我开了两个线程池,一个用来管理每类文件的hdfs操作,创建/写入/关闭hdfs上的orc文件;一个用来管理文件的滚动,在某一个时机,例如文件写入条数到达某个上限或者文件闲置到达一定时间,将当前文件关闭并创造新文件。

这个是hdfs操作线程池的代码逻辑:

//创建一个线程池,线程执行对象
callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
            new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
 
//callTimeoutPool执行线程任务,任务继承于callable,执行后返回Future对象
private <T> T callWithTimeout(final CallRunner<T> callRunner)
      throws IOException, InterruptedException {
    Future<T> future = callTimeoutPool.submit(new Callable<T>() {
      ...
    } 
}
 
//任务内容:将一条消息写到orc文件中
callWithTimeout(new CallRunner<Void>() {
        @Override
        public Void call() throws Exception {
          writer.append(message); //hdfs操作:写数据
          return null;
        }
      });
1.2 写orc文件的实现
数据已经分好类了,那么拿到的数据怎么转换成orc格式写到hdfs上呢?首先,创建的orc文件时需要TypeDescription用于描述orc文件的字段结构,然后创建一个batch用于按批次把记录写入orc文件,这一步比较简单,网上很多例子,就不详细描述了。

import org.apache.orc.TypeDescription;
import org.apache.orc.OrcFile.*;
import org.apache.orc.Writer;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
...
TypeDescription schema = TypeDescription.createStruct();
VectorizedRowBatch batch = schema.createRowBatch(maxFlushSize);
ORCFile.writerOptions writerOptions = ORCFile.writerOptions(conf).setSchema(schema);
Writer writer = ORCFile.createWriter(destPath, writerOptions);
... //add message into batch
writer.addRowBatch(batch);
其次orc的主要实现在WriterImpl和MemoryManagerImpl类。WriterImpl的addRowBatch方法会将写入的记录数传到MemoryManagerImpl中,MemoryManagerImpl的addedRow方法会累计写入的记录条数;当条数大于5000,回调WriterImpl的checkMemory,检查写入的记录占用字节数,当大于orc单个压缩块大小时(默认64M),才真正将数据flush到hdfs上。

//WriterImpl.java
 
public void addRowBatch(VectorizedRowBatch batch) throws IOException {
    ... //load message to orc
    memoryManager.addedRow(batch.size);
}
 
public boolean checkMemory(double newScale) throws IOException {
    long limit = (long) Math.round(adjustedStripeSize * newScale);
    long size = treeWriter.estimateMemory();
    if (LOG.isDebugEnabled()) {
      LOG.debug("ORC writer " + physicalWriter + " size = " + size +
          " limit = " + limit);
    }
    if (size > limit) {
      flushStripe();  //flush data to hdfs
      return true;
    }
    return false;
  }
//MemoryManagerImpl.java
 
public void addedRow(int rows) throws IOException {
    rowsAddedSinceCheck += rows;
    if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {  //ROWS_BETWEEN_CHECKS = 5000
      notifyWriters();
    }
  }
 
public void notifyWriters() throws IOException {
    ...
    for(WriterInfo writer: writerList.values()) {
      boolean flushed = writer.callback.checkMemory(currentScale);  // call back
      ...
    }
 }
1.3 第一个坑
到了这里,就遇到了第一个坑。熟悉flume的人应该知道(或者看上一篇文),事务是保证flume可靠性一个机制。

在sink中,每批消息在结束处理之后,如果没有发生意外,就要进行事务确认。这一步通俗来说,是告诉channel我这批数据已经好好的放到sink里了,无论程序再发生什么问题,这批处理的数据都不会被影响了,就可以把这批数据在channel中的缓存删掉了。

//sink.java
public Status process() throws EventDeliveryException {
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    transaction.begin();  //开始事务
    for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
       ...  //handle messages
    }
    transaction.commit(); //确认事务
}
但是,如果写的是orc文件,你会发现在事务确认之后,这批数据很有可能还在内存里。因为1.2所说的步骤,每个writer必须存满5000条数据以上,然后字节达到一个压缩块的大小以上,才能被写到orc文件上。如果按照原有逻辑,flume被强制kill掉(正常关闭不会丢数据),被确认的那批数据如果还在内存中,那么这批数据就完全丢失了,也就破坏了flume的可靠性。

这个问题的确困扰了我一段时间。后来的方法,是调大每个批次的最大记录数,去掉5000条判断的逻辑,开放压缩块大小限制,使每一批次的提交都能完整flush到文件中。不过这个方法只是一个治标不治本的方法,不知道是否还有更好方法呢?
 

猜你喜欢

转载自blog.csdn.net/wjandy0211/article/details/93197040