记录:397
场景:使用JdbcTemplate从一个数据库批量查出数据存放在List<Map<String, Object>>中,把查出的结果数据批量写入到另一个数据库。
版本:JDK 1.8,SpringBoot 2.6.3
关键类01:org.springframework.jdbc.core.JdbcTemplate
关键类02:org.springframework.jdbc.core.BatchPreparedStatementSetter
1.批量查询和批量写入
public static void main(String[] args) {
// 1.获取两个数据库的JdbcTemplate
JdbcTemplate db01 = getJdbcTemplateDb01();
JdbcTemplate db02 = getJdbcTemplateDb02();
// 2.查询SQL和插入SQL
String db01SelectSQL = getSelectSQL();
String db02InsertSQL = getInsertSQL();
// 3.批量查询
List<Map<String, Object>> db01Result = db01.queryForList(db01SelectSQL);
// 4.批量写入
db02.batchUpdate(db02InsertSQL, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Map<String, Object> oneRow = db01Result.get(i);
ps.setObject(1, oneRow.get("CITY_ID"));
ps.setObject(2, oneRow.get("CITY_NAME"));
ps.setObject(3, oneRow.get("LAND_AREA"));
ps.setObject(4, oneRow.get("POPULATION"));
ps.setObject(5, oneRow.get("GROSS"));
ps.setObject(6, oneRow.get("CITY_DESCRIBE"));
ps.setObject(7, oneRow.get("DATA_YEAR"));
ps.setObject(8, oneRow.get("UPDATE_TIME"));
}
@Override
public int getBatchSize() {
// 必须有值,一般是需要写入的数量
return db01Result.size();
}
});
}
2.查询SQL和写入SQL
// 查询SQL
public static String getSelectSQL() {
String selectSQL = "SELECT CITY_ID,CITY_NAME,LAND_AREA,POPULATION, " +
" GROSS,CITY_DESCRIBE,DATA_YEAR,UPDATE_TIME " +
"FROM t_city ";
return selectSQL;
}
// 插入SQL
public static String getInsertSQL() {
String insertSQL = "INSERT INTO t_city (\n" +
" CITY_ID,CITY_NAME,LAND_AREA,POPULATION,\n" +
" GROSS,CITY_DESCRIBE,DATA_YEAR,UPDATE_TIME)\n" +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
return insertSQL;
}
3.获取JdbcTemplate
// 操作数据库hub_a_db的JdbcTemplate
public static JdbcTemplate getJdbcTemplateDb01() {
String username = "hub_a";
String password = "12345678";
String jdbcUrl = "jdbc:mysql://127.0.0.1:3306/hub_a_db";
String driverName = "com.mysql.jdbc.Driver";
// com.alibaba.druid.pool.DruidDataSource
DruidDataSource dataSource = new DruidDataSource();
// 设置数据源属性参数
dataSource.setPassword(password);
dataSource.setUrl(jdbcUrl);
dataSource.setUsername(username);
dataSource.setDriverClassName(driverName);
// 获取spring的JdbcTemplate
JdbcTemplate jdbcTemplate = new JdbcTemplate();
// 设置数据源
jdbcTemplate.setDataSource(dataSource);
return jdbcTemplate;
}
// 操作数据库hub_b_db的JdbcTemplate
public static JdbcTemplate getJdbcTemplateDb02() {
String username = "hub_b";
String password = "12345678";
String jdbcUrl = "jdbc:mysql://127.0.0.1:3306/hub_b_db";
String driverName = "com.mysql.jdbc.Driver";
// com.alibaba.druid.pool.DruidDataSource
DruidDataSource dataSource = new DruidDataSource();
// 设置数据源属性参数
dataSource.setPassword(password);
dataSource.setUrl(jdbcUrl);
dataSource.setUsername(username);
dataSource.setDriverClassName(driverName);
// 获取spring的JdbcTemplate
JdbcTemplate jdbcTemplate = new JdbcTemplate();
// 设置数据源
jdbcTemplate.setDataSource(dataSource);
return jdbcTemplate;
}
4.建表语句
CREATE TABLE t_city (
CITY_ID BIGINT(16) NOT NULL COMMENT '唯一标识',
CITY_NAME VARCHAR(64) COLLATE utf8_bin NOT NULL COMMENT '城市名',
LAND_AREA DOUBLE DEFAULT NULL COMMENT '城市面积',
POPULATION BIGINT(16) DEFAULT NULL COMMENT '城市人口',
GROSS DOUBLE DEFAULT NULL COMMENT '生产总值',
CITY_DESCRIBE VARCHAR(512) COLLATE utf8_bin DEFAULT NULL COMMENT '城市描述',
DATA_YEAR VARCHAR(16) COLLATE utf8_bin DEFAULT NULL COMMENT '数据年份',
UPDATE_TIME DATETIME DEFAULT NULL COMMENT '更新时间'
) ENGINE=INNODB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='城市信息表';
5.建数据库语句
// 创建数据库hub_a_db和用户hub_a
USE mysql;
CREATE DATABASE hub_a_db DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
CREATE USER hub_a@'%' IDENTIFIED BY '12345678';
GRANT ALL ON hub_a_db.* TO 'hub_a'@'%' IDENTIFIED BY '12345678';
FLUSH PRIVILEGES;
// 创建数据库hub_b_db和用户hub_b
USE mysql;
CREATE DATABASE hub_b_db DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
CREATE USER hub_b@'%' IDENTIFIED BY '12345678';
GRANT ALL ON hub_b_db.* TO 'hub_b'@'%' IDENTIFIED BY '12345678';
FLUSH PRIVILEGES;
6.在MySQL中查询一张表的全部字段SQL
SELECT
COLUMN_NAME
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 't_city';
7.BatchPreparedStatementSetter的getBatchSize说明
BatchPreparedStatementSetter的getBatchSize需要返回值原因是JdbcTemplate的batchUpdate在执行前,会先调用getBatchSize获取值,做后续判断。源码如下:
public int[] batchUpdate(String sql, final BatchPreparedStatementSetter pss) throws DataAccessException {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Executing SQL batch update [" + sql + "]");
}
int[] result = (int[])this.execute(sql, (ps) -> {
try {
int batchSize = pss.getBatchSize();
InterruptibleBatchPreparedStatementSetter ipss = pss instanceof InterruptibleBatchPreparedStatementSetter ? (InterruptibleBatchPreparedStatementSetter)pss : null;
if (!JdbcUtils.supportsBatchUpdates(ps.getConnection())) {
List<Integer> rowsAffected = new ArrayList();
int ix = 0;
while(true) {
if (ix < batchSize) {
pss.setValues(ps, ix);
if (ipss == null || !ipss.isBatchExhausted(ix)) {
rowsAffected.add(ps.executeUpdate());
++ix;
continue;
}
}
int[] rowsAffectedArray = new int[rowsAffected.size()];
for(int ixx = 0; ixx < rowsAffectedArray.length; ++ixx) {
rowsAffectedArray[ixx] = (Integer)rowsAffected.get(ixx);
}
int[] var13 = rowsAffectedArray;
return var13;
}
} else {
int i = 0;
while(true) {
if (i < batchSize) {
pss.setValues(ps, i);
if (ipss == null || !ipss.isBatchExhausted(i)) {
ps.addBatch();
++i;
continue;
}
}
int[] var10 = ps.executeBatch();
return var10;
}
}
} finally {
if (pss instanceof ParameterDisposer) {
((ParameterDisposer)pss).cleanupParameters();
}
}
});
Assert.state(result != null, "No result array");
return result;
}
以上,感谢。
2023年4月10日