Spark处理时间序列数据

整个逻辑为: 设备端毫秒级生成数据以文本方式传输到HDFS,然后通过SPARK解析文件并存储到HBASE中。

问题1:SPARK本身为分布式,如果通过分布式解析文件并存储,如何保证数据系列顺序?

问题2:使用SPARK进行HBASE插入的时候,使用RddPartitionForeach,然后给每条记录生成时间戳,并插入到HBASE,发现即使是微妙级别,在foreach的时候仍然会出现时间相同的情况,这样会导致Rowkey相同。

整个代码如下:

try {
      bankRDD.foreachPartition { x =>
        val hbaseconf = HBaseConfiguration.create()
        hbaseconf.set("hbase.zookeeper.quorum", "datanode01.isesol.com,datanode02.isesol.com,datanode03.isesol.com,datanode04.isesol.com,cmserver.isesol.com")
        hbaseconf.set("hbase.zookeeper.property.clientPort", "2181")
        hbaseconf.set("maxSessionTimeout", "6")
        val conn = ConnectionFactory.createConnection(hbaseconf);
        val params = new BufferedMutatorParams(TableName.valueOf(table))

        params.writeBufferSize(8 * 1024 * 1024)
        val htable = conn.getBufferedMutator(params)

        val myTable = new HTable(hbaseconf, TableName.valueOf(table))
        myTable.setAutoFlush(false, false)
        myTable.setWriteBufferSize(20 * 1024 * 1024)

        val driver = "com.mysql.jdbc.Driver"
        val url_spark = "jdbc:mysql://10.215.4.161:3306/test"
        Class.forName(driver)
        val conn1 = DriverManager.getConnection(url_spark, "admin", "internal")
        conn1.setAutoCommit(false)
        val stmt = conn1.createStatement()

        x.foreach { y =>
          {

            val rowkey = System.currentTimeMillis().toString()
            val ran = Random.nextLong()
            // Thread.sleep(1)

            val p = new Put(Bytes.toBytes(machine_tool + "#" + cur_time(3) + "#" + fileNum + "#" + rowkey + "#" + ran))

            var column2 = ""
            for (i <- 0 to hbaseCols_scala.size - 1) {
              p.add(Bytes.toBytes("cf"), Bytes.toBytes(hbaseCols_scala(i)), Bytes.toBytes(y(i)))
              column2 = column2 + y(i)
            }

            val column1 = machine_tool + "#" + cur_time(3) + "#" + fileNum + "#" + rowkey

            val sql = "insert into test(id,val) values(" + "'" + column1 + "'" + ")"
            val rs = stmt.execute(sql)

            // myTable.put(p)
            list.add(p)
            // htable.mutate(p)
            //htable.flush()

          }

        }

        myTable.put(list)
        myTable.flushCommits()
        myTable.close()
      }
    } catch {
      case ex: Exception => println("can not connect hbase")
    }

    println("end put data into HBase" + System.currentTimeMillis().toString())
  }
上面的代码是我测试代码,现在来回答上面的问题:

问题1:spark为分布式,插入时间系列数据其实不太方便,因为分布式并不能保证顺序,我这里简单的把文件设置为1个partition,也就意味着只有一个task来处理,如果文件比较大怎么办?

问题2:spark foreach的时候即使微妙级别仍然会有时间相同,我的简单的解决方案是再添加一个random随机值,这样既可保证数据顺序,又能保证rowkey不重复。为了增加HBASE插入性能,flushcommit(false)是必须的,否者速度会很慢,

关于时间序列数据,我一直还没使用时间序列数据库来处理,但是不管怎么样,分布式的组件来处理时间系列数据,尤其是在批量插入的时候,的确会有一些很难处理的问题。之后使用时间序列数据库看看什么情况。



猜你喜欢

转载自blog.csdn.net/tom_fans/article/details/79066981