Flink允许用户使用实现org.apache.hadoop.fs.FileSystem接口的任何文件系统。例如S3、 Google Cloud Storage Connector for Hadoop、 Alluxio、 XtreemFS、 FTP等各种文件系统
- Flink与Apache Hadoop MapReduce接口兼容,因此允许重用Hadoop MapReduce实现的代码
- 使用Hadoop Writable data type
- 使用任何Hadoop InputFormat作为DataSource(flink内置HadoopInputFormat)
- 使用任何Hadoop OutputFormat作为DataSink(flink内置HadoopOutputFormat)
- 使用Hadoop Mapper作为FlatMapFunction
- 使用Hadoop Reducer作为GroupReduceFunction
- Flink集成hbase
-
第一步:创建hbase表并插入数据
create 'hbasesource','f1' put 'hbasesource','0001','f1:name','zhangsan' put 'hbasesource','0002','f1:age','18'
-
第二步:导入整合jar包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <!-- 暂时没有1.8.1这个版本 --> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency>
-
第三步:读取hbase数据
import org.apache.flink.addons.hbase.TableInputFormat import org.apache.flink.api.java.tuple import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes object FlinkReadHBase { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val hbaseData: DataSet[tuple.Tuple2[String, String]] = environment.createInput(new TableInputFormat[tuple.Tuple2[String, String]] { override def configure(parameters: Configuration): Unit = { val conf = HBaseConfiguration.create(); conf.set(HConstants.ZOOKEEPER_QUORUM, "node01,node02,node03") conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") val conn: Connection = ConnectionFactory.createConnection(conf) table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource"))) scan = new Scan() { // setStartRow(Bytes.toBytes("1001")) // setStopRow(Bytes.toBytes("1004")) addFamily(Bytes.toBytes("f1")) } } override def getScanner: Scan = { scan } override def getTableName: String = { "hbasesource" } override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = { val rowkey: String = Bytes.toString(result.getRow) val sb = new StringBuffer() for (cell: Cell <- result.rawCells()) { val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength) sb.append(value).append(",") } val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String, String] tuple2.setField(rowkey, 0) tuple2.setField(valueString, 1) tuple2 } }) hbaseData.print() environment.execute() } }
-
第四步:写入hbase
- 第一种方式:实现OutputFormat接口
- 第二种方式:继承RichSinkFunction重写父类方法
import java.util import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.scala.{ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes object FlinkWriteHBase { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceDataSet: DataSet[String] = environment.fromElements("01,zhangsan,28","02,lisi,30") sourceDataSet.output(new HBaseOutputFormat) environment.execute() } } class HBaseOutputFormat extends OutputFormat[String]{ val zkServer = "node01" val port = "2181" var conn: Connection = null override def configure(configuration: Configuration): Unit = { } override def open(i: Int, i1: Int): Unit = { val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create config.set(HConstants.ZOOKEEPER_QUORUM, zkServer) config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port) config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000) config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000) conn = ConnectionFactory.createConnection(config) } override def writeRecord(it: String): Unit = { val tableName: TableName = TableName.valueOf("hbasesource") val cf1 = "f1" val array: Array[String] = it.split(",") val put: Put = new Put(Bytes.toBytes(array(0))) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1))) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2))) val putList: util.ArrayList[Put] = new util.ArrayList[Put] putList.add(put) //设置缓存1m,当达到1m时数据会自动刷到hbase val params: BufferedMutatorParams = new BufferedMutatorParams(tableName) //设置缓存的大小 params.writeBufferSize(1024 * 1024) val mutator: BufferedMutator = conn.getBufferedMutator(params) mutator.mutate(putList) mutator.flush() putList.clear() } override def close(): Unit = { if(null != conn){ conn.close() } } }
-