Flink写入Doris的实时应用

Flink写入Doris的实时应用

想深入交流doris的私聊我,加微信

引言

做实时数仓的同学对目前比较流行的KFC(Kafka\Flink\ClickHouse)套餐非常熟悉,其实KFD也不错。
大数据组件越来越丰富,但是还没有出现一个兼容OLAP和OLTP的工具,即满足DB和日志的实时存储和复杂查询,又能满足在此基础上的数仓建设,我们尝试过ClickHouse,缺点在于难维护、实时写入效率低,内部碎片合并和数据走zk难以实现大量数据的实时存储;之后使用过impala+kudu,缺点是impala实在是太占用内存,两者结合用起来比较费劲,也是开发了实时同步DB的工具,维护成本太高,也放弃了;最终在参考百度的doris和作业帮的资料下,正式的开始使用Doris,实现了log和DB(包含分表合并)的准实时同步,以及基于doris的数仓建模。
接下来我会就Doris的实时写入部分简单的说一下实现方式,代码和注释为主

表设计

不要把字段设计成"not null",好处在于后期改表(加字段)不会影响正常的数据,其他的暂时不方便透露,之后会慢慢讲

JSONStreamLoad

为什么选择StreamLoad呢?一开始使用的是insert into,insert into是使用的FE资源的,导致FE繁忙,后期数据量上来会出问题,而streamload不存在这个问题,官方是这么说的(0.12的文档):

Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。

用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。

导入的最终结果由 Coordinator BE 返回给用户。

                         ^      +
                         |      |
                         |      | 1A. User submit load to FE
                         |      |
                         |   +--v-----------+
                         |   | FE           |
 - Return result to user |   +--+-----------+
                         |      |
                         |      | 2. Redirect to BE
                         |      |
                         |   +--v-----------+
                         +---+Coordinator BE| 1B. User submit load to BE
                             +-+-----+----+-+
                               |     |    |
                         +-----+     |    +-----+
                         |           |          | 3. Distrbute data
                         |           |          |
                       +-v-+       +-v-+      +-v-+
                       |BE |       |BE |      |BE |
                       +---+       +---+      +---+

之后参考京东做法,不断的load小文件实现,实时的数据插入。
踩坑:

  • 尽量的数据量要大,避免多次提交,而会出现线程占有的问题
  • load是以DB为单位的,一个DB默认100个线程,控制好load的线程数
  • load是很耗内存的,一是线程,二是数据合并
  • streaming_load_max_batch_size_mb默认是100,根据业务进行更改
  • 如果要同步DB的数据注意多线程执行curl

实现起来比较简单,无非是在flinkSink代码中嵌入一段执行curl的代码

## 原curl
curl --location-trusted -u 用户名:密码 -T /xxx/test -H "format: json" -H "strip_outer_array: true" http://doris_fe:8030/api/{
    
    database}/{
    
    table}/_stream_load
## -u 不用解释了,用户名和密码
## -T json文件的地址,内容为[json,json,json],就是jsonlist
## -H 指定参数
## http 指定库名和表名

步骤:生成临时文件createFile,将数据写入临时文件mappedFile,执行execCurl, 删除临时文件deleteFile (简化版)


    /**
     * 创建临时内存文件
     * @param fileName
     * @throws IOException
     */
    public static void createFile(String fileName) throws IOException {
    
    

        File testFile = new File(fileName);
        File fileParent = testFile.getParentFile();

        if (!fileParent.exists()) {
    
    
            fileParent.mkdirs();
        }
        if (!testFile.exists())
            testFile.createNewFile();
    }

    /**
     * 删除临时内存文件
     * @param fileName
     * @return
     */
    public static boolean deleteFile(String fileName) {
    
    
        boolean flag = false;
        File file = new File(fileName);
        // 路径为文件且不为空则进行删除
        if (file.isFile() && file.exists()) {
    
    
            file.delete();
            flag = true;
        }
        return flag;
    }
    
    /**
     * 写入内存文件
     * @param data
     * @param path
     */
    public static void mappedFile(String data, String path) {
    
    

        CharBuffer charBuffer = CharBuffer.wrap(data);

        try {
    
    
            FileChannel fileChannel = FileChannel.open(Paths.get(path), StandardOpenOption.READ, StandardOpenOption.WRITE,
                    StandardOpenOption.TRUNCATE_EXISTING);

            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.getBytes().length*4);

            if (mappedByteBuffer != null) {
    
    
                mappedByteBuffer.clear();
                mappedByteBuffer.put(Charset.forName("UTF-8").encode(charBuffer));
            }
            fileChannel.close();
        } catch (IOException e) {
    
    
            e.printStackTrace();
        }

    }

    /**
     * 执行curl
     * @param curl
     * @return
     */
    public static String execCurl(String[] curl) {
    
    

        ProcessBuilder process = new ProcessBuilder(curl);
        Process p;
        try {
    
    
            p = process.start();
            BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
            StringBuilder builder = new StringBuilder();
            String line = null;
            while ((line = reader.readLine()) != null) {
    
    
                builder.append(line);
                builder.append(System.getProperty("line.separator"));
            }
            return builder.toString();

        } catch (IOException e) {
    
    
            System.out.print("error");
            e.printStackTrace();
        }
        return null;

    }
    /**
     * 生成Culr
     * @param filePath
     * @param databases
     * @param table
     * @return
     */
    public static String[] createCurl(String filePath, String databases, String table){
    
    
        String[] curl = {
    
    "curl","--location-trusted", "-u", "用户名:密码", "-T",filePath, "-H","format: json", "-H", "strip_outer_array: true", "http://doris_fe:8030/api/"+databases+"/"+table+"/_stream_load"};
        
        return curl;
    }

flink

实现自定义Sink比较简单,这里就简单的分享一下我的怎么写的(简化版)。

class LogCurlSink(insertTimenterval:Long,
                  insertBatchSize:Int) extends RichSinkFunction[(String, Int, Long, String)] with Serializable{
    
    
  private val Logger = LoggerFactory.getLogger(this.getClass)
  private val mesList = new java.util.ArrayList[String]()
  private var lastInsertTime = 0L
  
  override def open(parameters: Configuration): Unit ={
    
    
    val path = s"/tmp/doris/{databases}/{table}/{ThreadId}"
    CurlUtils.createFile(path)
    Logger.warn(s"init and create $topic filePath!!!")
  }
  
  	// (topic,partition,offset,jsonstr)
   override def invoke(value: (String, Int, Long, String), context: SinkFunction.Context[_]): Unit = {
    
    
    if(mesList.size >= this.insertBatchSize || isTimeToDoInsert){
    
    
      //存入
      insertData(mesList)
      //此处可以进行受到维护offset
      mesList.clear()
      this.lastInsertTime = System.currentTimeMillis()
    }
    mesList.add(value._4)
  }
  
  
  override def close(): Unit = {
    
    
    val path = s"/tmp/doris/{databases}/{table}/{ThreadId}"
    CurlUtils.deleteFile(path)
    Logger.warn("close and delete filePath!!!")
  }

  /**
    * 执行插入操作
    * @param dataList
    */
  private def insertData(dataList: java.util.ArrayList[String]): Unit ={
    
    }
  /**
    * 根据时间判断是否插入数据
    *
    * @return
    */
  private def isTimeToDoInsert = {
    
    
    val currTime = System.currentTimeMillis
    currTime - this.lastInsertTime >= this.insertCkTimenterval
  }

}

猜你喜欢

转载自blog.csdn.net/jklcl/article/details/112851685