整个逻辑为: 设备端毫秒级生成数据以文本方式传输到HDFS,然后通过SPARK解析文件并存储到HBASE中。
问题1:SPARK本身为分布式,如果通过分布式解析文件并存储,如何保证数据系列顺序?
问题2:使用SPARK进行HBASE插入的时候,使用RddPartitionForeach,然后给每条记录生成时间戳,并插入到HBASE,发现即使是微妙级别,在foreach的时候仍然会出现时间相同的情况,这样会导致Rowkey相同。
整个代码如下:
try { bankRDD.foreachPartition { x => val hbaseconf = HBaseConfiguration.create() hbaseconf.set("hbase.zookeeper.quorum", "datanode01.isesol.com,datanode02.isesol.com,datanode03.isesol.com,datanode04.isesol.com,cmserver.isesol.com") hbaseconf.set("hbase.zookeeper.property.clientPort", "2181") hbaseconf.set("maxSessionTimeout", "6") val conn = ConnectionFactory.createConnection(hbaseconf); val params = new BufferedMutatorParams(TableName.valueOf(table)) params.writeBufferSize(8 * 1024 * 1024) val htable = conn.getBufferedMutator(params) val myTable = new HTable(hbaseconf, TableName.valueOf(table)) myTable.setAutoFlush(false, false) myTable.setWriteBufferSize(20 * 1024 * 1024) val driver = "com.mysql.jdbc.Driver" val url_spark = "jdbc:mysql://10.215.4.161:3306/test" Class.forName(driver) val conn1 = DriverManager.getConnection(url_spark, "admin", "internal") conn1.setAutoCommit(false) val stmt = conn1.createStatement() x.foreach { y => { val rowkey = System.currentTimeMillis().toString() val ran = Random.nextLong() // Thread.sleep(1) val p = new Put(Bytes.toBytes(machine_tool + "#" + cur_time(3) + "#" + fileNum + "#" + rowkey + "#" + ran)) var column2 = "" for (i <- 0 to hbaseCols_scala.size - 1) { p.add(Bytes.toBytes("cf"), Bytes.toBytes(hbaseCols_scala(i)), Bytes.toBytes(y(i))) column2 = column2 + y(i) } val column1 = machine_tool + "#" + cur_time(3) + "#" + fileNum + "#" + rowkey val sql = "insert into test(id,val) values(" + "'" + column1 + "'" + ")" val rs = stmt.execute(sql) // myTable.put(p) list.add(p) // htable.mutate(p) //htable.flush() } } myTable.put(list) myTable.flushCommits() myTable.close() } } catch { case ex: Exception => println("can not connect hbase") } println("end put data into HBase" + System.currentTimeMillis().toString()) }上面的代码是我测试代码,现在来回答上面的问题:
问题1:spark为分布式,插入时间系列数据其实不太方便,因为分布式并不能保证顺序,我这里简单的把文件设置为1个partition,也就意味着只有一个task来处理,如果文件比较大怎么办?
问题2:spark foreach的时候即使微妙级别仍然会有时间相同,我的简单的解决方案是再添加一个random随机值,这样既可保证数据顺序,又能保证rowkey不重复。为了增加HBASE插入性能,flushcommit(false)是必须的,否者速度会很慢,
关于时间序列数据,我一直还没使用时间序列数据库来处理,但是不管怎么样,分布式的组件来处理时间系列数据,尤其是在批量插入的时候,的确会有一些很难处理的问题。之后使用时间序列数据库看看什么情况。