思路
对于mysql中的数据,可以考虑分批次进行同步处理,在sqoop的源码中也有这样的实现原理,达到更高效率,
实现的方式很简单,先从mysql中读取最大值和最小值,对于批次数由用户自己指定,耦合出代码外.在抽取数据的sql模块中,动态拼接between xxx and xxx 中
代码实现
主程序
package dbus.fullPull;
import dbus.config.GlobalConfig;
import dbus.utils.JdbcUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
public class fullPullApp {
public static final boolean isparallelism = true;
//分割字段
public static final String SPLIT_FIELD = "goodsId";
public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BIG_DEC_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
public static void main(String[] args) throws Exception {
//获取实现环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//读取商品表
JDBCInputFormat.JDBCInputFormatBuilder jdbcInputFormatBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(GlobalConfig.DRIVER_CLASS)
.setDBUrl(GlobalConfig.DB_URL)
.setUsername(GlobalConfig.USER_MAME)
.setPassword(GlobalConfig.PASSWORD)
.setQuery("select * from zyd_goods")
.setRowTypeInfo(ROW_TYPE_INFO);
if (isparallelism) {
int fetchSize = 2;
Boundary boundary = boundaryQuery(SPLIT_FIELD);
jdbcInputFormatBuilder.setQuery("select * from zyd_goods where "+SPLIT_FIELD+" between ? and ?")
.setParametersProvider(new NumericBetweenParametersProvider(fetchSize,boundary.min,boundary.max));
}
//读取MySQL数据
DataSet<Row> source = env.createInput(jdbcInputFormatBuilder.finish());
source.print();
//生成hbase的输出数据
DataSet<Tuple2<Text, Mutation>> hbaseResult = convertMysqlToHbase(source);
//数据输出到hbase
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "note01,note02,note03");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
conf.set(TableOutputFormat.OUTPUT_TABLE, "learing_flink:zyd_goods");
conf.set("mapreduce.output.fileoutputformat.outputdir", "/tmp");
//新建一个job实例
Job job = Job.getInstance(conf);
hbaseResult.output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<>(), job));
env.execute("FullPullerAPP");
}
private static DataSet<Tuple2<Text, Mutation>> convertMysqlToHbase(DataSet<Row> dataSet) {
return dataSet.map(new RichMapFunction<Row, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> resultTp;
private byte[] cf = "F".getBytes(ConfigConstants.DEFAULT_CHARSET);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
resultTp = new Tuple2<>();
}
@Override
public Tuple2<Text, Mutation> map(Row value) throws Exception {
resultTp.f0 = new Text(value.getField(0).toString());
Put put = new Put(value.getField(0).toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
if (null != value.getField(1)) {
put.addColumn(cf, Bytes.toBytes("goodsName"), Bytes.toBytes(value.getField(1).toString()));
}
put.addColumn(cf, Bytes.toBytes("sellingPrice"), Bytes.toBytes(value.getField(2).toString()));
put.addColumn(cf, Bytes.toBytes("goodsStock"), Bytes.toBytes(value.getField(3).toString()));
put.addColumn(cf, Bytes.toBytes("appraiseNum"), Bytes.toBytes(value.getField(4).toString()));
resultTp.f1 = put;
return resultTp;
}
});
}
//预先查询数据库中最大值与最小值
public static Boundary boundaryQuery(String splitField) throws Exception {
String sql = "select min(" + splitField + ") , max(" + splitField + ") from zyd_goods";
Connection connection = null;
Statement statement = null;
ResultSet resultSet = null;
int min = 0;
int max = 0;
try {
connection = JdbcUtil.getConnection();
statement = connection.createStatement();
resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
min = resultSet.getInt(1);
max = resultSet.getInt(2);
System.out.println(min + "------------------" + max);
}
} finally {
JdbcUtil.close(resultSet, statement, connection);
}
return Boundary.of(min, max);
}
public static class Boundary {
private int min;
private int max;
public Boundary(int min, int max) {
this.min = min;
this.max = max;
}
public static Boundary of(int min, int max) {
return new Boundary(min, max);
}
}
}
基本JDBCUtil
package com.dajiangtai.batch.dbus.utils;
import com.dajiangtai.batch.dbus.config.GlobalConfig;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
/**
* jdbc通用的方法
* @Date: 2019/3/4 下午6:00
*
*/
public class JdbcUtil {
//url
private static String url = GlobalConfig.DB_URL;
//user
private static String user = GlobalConfig.USER_MAME;
//password
private static String password = GlobalConfig.PASSWORD;
//驱动程序类
private static String driverClass = GlobalConfig.DRIVER_CLASS;
/**
* 只注册一次,静态代码块
*/
static{
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
/**
* 获取连接方法
*/
public static Connection getConnection(){
try {
Connection conn = DriverManager.getConnection(url, user, password);
return conn;
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* 释放资源的方法
*/
public static void close(Statement stmt,Connection conn){
if(stmt!=null){
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
/**
* 释放资源的方法
*/
public static void close(ResultSet rs,Statement stmt,Connection conn){
if(rs!=null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(stmt!=null){
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
System.out.println(JdbcUtil.getConnection());
}
}