套路
官网实例
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hbase.html
-- register the HBase table 'mytable' in Flink SQL
CREATE TABLE hTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181'
);
-- use ROW(...) construction function construct column families and write data into the HBase table.
-- assuming the schema of "T" is [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO hTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
-- scan data from the HBase table
SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
-- temporal join the HBase table as a dimension table
SELECT * FROM myTopic
LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = hTable.rowkey;
2.自己实现
2.1mysql 表结构
CREATE TABLE `MyHbaseEtl` (
`id` int(64) DEFAULT NULL,
`name` varchar(50) DEFAULT NULL,
`age` bigint(64) DEFAULT NULL,
`sex` varchar(64) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
2.2 hbase 表数据
hbase(main):036:0> desc 'student'
Table student is ENABLED
student
COLUMN FAMILIES DESCRIPTION
{
NAME => 'info', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'fals
e', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_ME
MORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
1 row(s)
Took 0.0560 seconds
hbase(main):037:0>
插入数据
put 'student' ,'1005','info:name','spark'
查询
hbase(main):008:0> scan 'student'
ROW COLUMN+CELL
10004 column=info:sex, timestamp=1597812900905, value=\xE5\xA5\xB3
1001 column=info:name, timestamp=1597812783278, value=nike
1002 column=info:name, timestamp=1597812806264, value=male
1003 column=info:age, timestamp=1597812866724, value=18
1003 column=info:name, timestamp=1597812836550, value=xiaoming
1005 column=info:name, timestamp=1630141371996, value=spark
5 row(s)
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
<version>1.12.0</version>
</dependency>
代码
package com.wudl.flink.examples;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
/**
* @ClassName : FlinkMySqlToHbase
* @Description :
* @Author :wudl
* @Date: 2021-08-28 16:52
*/
public class FlinkMySqlToHbase {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
TableResult inputTable = tableEnv.executeSql("CREATE TABLE sourceHbaseTable (" +
"rowkey INT ," +
"info ROW<name STRING ,sex STRING,age STRING> ," +
"PRIMARY KEY (rowkey) NOT ENFORCED "+
") " +
"WITH (" +
"'connector' = 'hbase-2.2'," +
"'table-name' = 'student'," +
"'sink.buffer-flush.max-rows' = '1000'," +
"'zookeeper.quorum' = '192.168.1.161:2181'" +
" )");
TableResult outPutTable = tableEnv.executeSql("CREATE TABLE MySqlTable (" +
"id INT ," +
"name STRING ," +
"sex STRING ," +
"age BIGINT " +
") " +
"WITH (" +
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://192.168.1.180:3306/MyFlink?useUnicode=true&characterEncoding=UTF-8'," +
"'table-name' = 'MyHbaseEtl'," +
" 'username' = 'root'," +
" 'password' = '123456'" +
" )");
// Table table = tableEnv.sqlQuery("select id ,ROW(name,sex, age) info from MySqlTable");
// tableEnv.executeSql("insert into sourceHbaseTable select * from "+table);
Table table = tableEnv.sqlQuery("SELECT cast(rowkey as INT) id, info.name name ,info.sex sex , cast(info.age as BIGINT) age FROM sourceHbaseTable");
tableEnv.executeSql("insert into MySqlTable select * from "+table);
/**
* 查询hbase 数据
*/
// tableEnv.sqlQuery("select * from MySqlTable").execute().print();
// Table table = tableEnv.sqlQuery("SELECT rowkey , info.name,info.sex ,info.age FROM sourceHbaseTable");
//
// // 查询的结果
// TableResult executeResult = table.execute();
//
// // 获取查询结果
// CloseableIterator<Row> collect = executeResult.collect();
// executeResult.print();
}
}
Flink 对应hbase 的数据类型