相信从事大数据开发的人员,越来越多的人从事实时计算方向,flink技术就显示十分重要,说该技术重要,不仅仅是因为它的流式计算,更多的是和其他技术的整合比较强大,在开发过程中,除了写入消息中间件等场景,有的时候也需要写入传统的数据库,如Oracle或者MySql。
我们习惯于连接关系型数据库的时候采用一些连接池如c3p0,在传统的业务开发或者数据量不是很大的时候,是没有问题的,但是在大数据量的情况,这种方式写入速率是远远不够的。说到这里,博主相信很多人会说增大并行度,通过牺牲资源来提升效率,这种方式虽然可以实现效率的提升,但是会过多的消耗flink的slot,而且,连接池的大小也不是很好掌控,连接过多,数据库的连接压力太大,还要注意的是,连接池关闭情况是否良好,当flink任务重启或者集群重启的时候,连接池的连接是否释放,这些问题博主在开发的过程中都遇到过,所以在此奉上博主的解决办法,本人采用的是优化sql及多线程的方式
本人使用Oracle场景:根据当前数据查询Oracle,如果Oracle有当前数据,对当前数据更新,如果没有当前数据,把当前数据插入Oracle。
sql优化
我相信,大多数非专门从事ETL的人都会先select,再进行insert或者update,这样是没有问题的,但是数据的压力会很大,本人再用merger into的方式,将压力从数据库转移到sql本身的计算上,在后面博主会贴上本人的s’q’l。
采用多线程代替连接池
连接池可以避免频繁的创建连接,但是f’lin’k的open方法同样可以实现该功能,因为该方法只加载一次
话不多说,上代码
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ***OracleSinkMutlThread extends RichSinkFunction<Tuple5<String, Long, Long, Double, Double>> {
private static final Logger LOGGER = LoggerFactory.getLogger(***OracleSinkMultThread.class);
private List<Tuple2<Connection, PreparedStatement>> connectionList = new ArrayList<>();
private int index = 0;
private ExecutorService executorService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("oracle.jdbc.driver.OracleDriver");
for (int i = 0; i < 10; i++) {
Connection connection = DriverManager.getConnection("***", "***", "***");
PreparedStatement statement = connection.prepareStatement("merge 表名 a using (select ? DAY_TIME,? HOUR_TIME,? PROV_NAME,? INTERFACE_NAME,? TOTAL_COUNT,? FAIL_COUNT,? TOTAL_TIME,? FAIL_TIME from dual) b on(a.DAY_TIME = b.DAY_TIME and a.HOUR_TIME=b.HOUR_TIME and a.PROV_NAME=b.PROV_NAME and a.INTERFACE_NAME=b.INTERFACE_NAME) when matched then update set a.TOTAL_COUNT=a.TOTAL_COUNT+b.TOTAL_COUNT, a.FAIL_COUNT=a.FAIL_COUNT+b.FAIL_COUNT,a.TOTAL_TIME=a.TOTAL_TIME+b.TOTAL_TIME,a.FAIL_TIME=a.FAIL_TIME+b.FAIL_TIME where DAY_TIME=? and HOUR_TIME=? and PROV_NAME=? and INTERFACE_NAME=? when not matched then insert values (b.DAY_TIME,b.HOUR_TIME,b.PROV_NAME,b.INTERFACE_NAME,b.TOTAL_COUNT,b.FAIL_COUNT,b.TOTAL_TIME,b.FAIL_TIME,?)");
connectionList.add(new Tuple2(connection, statement));
}
executorService = Executors.newFixedThreadPool(10);
}
@Override
public void invoke(Tuple5<String, Long, Long, Double, Double> value, Context context) throws Exception {
String[] split = value.f0.split("_");
String[] time = split[2].split("\t");
String day_time = time[1];
String hour_time = time[2];
String provinceName = split[3];
String INTERFACE_NAME = split[4];
PreparedStatement statement;
try {
statement = connectionList.get(index).f1;
statement.setString(1,day_time);
statement.setString(2,hour_time);
statement.setString(3,provinceName);
statement.setString(4,INTERFACE_NAME);
statement.setString(5,value.f1+"");
statement.setString(6,value.f2+"");
statement.setString(7,value.f3+"");
statement.setString(8,value.f4+"");
statement.setString(9,day_time);
statement.setString(10,hour_time);
statement.setString(11,provinceName);
statement.setString(12,INTERFACE_NAME);
long current_time = new Date().getTime();
java.sql.Date dateSql = new java.sql.Date(current_time);
statement.setDate(13,dateSql);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
statement.execute();
} catch (SQLException e) {
e.printStackTrace();
}
}
});
index += 1;
if (index == 10) {
index = 0;
}
}catch (SQLException e){
e.printStackTrace();
LOGGER.error(String.format("%s -> *** !", "***"));
}
}
@Override
public void close() throws Exception {
super.close();
if (executorService != null) {
executorService.shutdown();
}
for (Tuple2<Connection, PreparedStatement> tuple2 : connectionList) {
if (tuple2.f0 != null) {
tuple2.f0.close();
}
if (tuple2.f1 != null) {
tuple2.f1.close();
}
}
}
}
该方法实现了richSinkFunction,再程序刚执行的时候,此种实现方式初始化很快,因为传统的连接池会花费大量时间进行连接池的初始化,可以通过控制循环的大小控制程序的并行度,减少集群slot的消耗,同时,再任务关闭的时候,连接释放的也没有问题。