JdbcSinkConnector
public class JdbcSinkConnector extends SinkConnector {
public Class<? extends Task> taskClass() {
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
}
@Override
public void start(Map<String, String> props) {
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
}
@Override
public Config validate(Map<String, String> connectorConfigs) {
}
@Override
public String version() {
}
}
start: connect启动时的生命周期方法,props为创建connect实例时填写的config参数,例如:
{
"name": "jdbc-sink-debezium",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mysql02.debezium_test_db.person2",
"table.name.format": "test.person",
"connection.url": "jdbc:db2://192.168.84.136:50000/TEST",
"connection.user": "db2inst1",
"connection.password": "root1234",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "insert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
taskClass: 指定真正执行任务的类,必须为SinkTask的子类
taskConfigs: 传递给任务对象的配置
这个类的作用主要时指定了任务类为JdbcSinkTask,并且将启动参数原封不动的传递给任务对象。
JdbcSinkTask
public class JdbcSinkTask extends SinkTask {
@Override
public void start(final Map<String, String> props) {
}
@Override
public void put(Collection<SinkRecord> records) {
}
public void stop() {
}
}
start: props为接受的启动参数,由JdbcSinkConnector的taskConfigs提供
put: connect从kafka中获取到的数据后会封装成record对象调用这个put方法
stop: 任务关闭时执行的生命周期方法
start
先看start方法:
@Override
public void start(final Map<String, String> props) {
log.info("Starting JDBC Sink task");
config = new JdbcSinkConfig(props); //1
initWriter(); //2
remainingRetries = config.maxRetries; //3
}
1号代码就是将传入的配置封装成JdbcSinkConfig对象
2号代码初始化了writer属性
3号代码读取了配置中的maxRetries赋值给了remainingRetries
进入initWriter方法:
void initWriter() {
if (config.dialectName != null && !config.dialectName.trim().isEmpty()) { //1
dialect = DatabaseDialects.create(config.dialectName, config); //2
} else {
dialect = DatabaseDialects.findBestFor(config.connectionUrl, config); //3
}
final DbStructure dbStructure = new DbStructure(dialect); //4
log.info("Initializing writer using SQL dialect: {}", dialect.getClass().getSimpleName());
writer = new JdbcDbWriter(config, dialect, dbStructure); //5
}
1号代码在校验是否指定了dialect,也就是数据库方言,方便后续2、3号代码去创建具体的DatabaseDialect对象,这里假设是Db2DatabaseDialect
4号代码创建了DbStructure对象,DbStructure中包含了一些与数据库结构相关的方法,例如创建不存在的表,创建缺失的列
5号代码创建了一个JdbcDbWriter对象,核心方法write负责向DB写入数据
put
再来看put方法:
@Override
public void put(Collection<SinkRecord> records) {
if (records.isEmpty()) {
return;
}
final SinkRecord first = records.iterator().next();
final int recordsCount = records.size();
log.debug(
"Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the "
+ "database...",
recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset()
);
try {
writer.write(records); //1
} catch (SQLException sqle) {
log.warn(
"Write of {} records failed, remainingRetries={}",
records.size(),
remainingRetries,
sqle
);
String sqleAllMessages = "";
for (Throwable e : sqle) {
sqleAllMessages += e + System.lineSeparator();
}
if (remainingRetries == 0) {
throw new ConnectException(new SQLException(sqleAllMessages));
} else {
writer.closeQuietly();
initWriter();
remainingRetries--;
context.timeout(config.retryBackoffMs);
throw new RetriableException(new SQLException(sqleAllMessages));
}
}
remainingRetries = config.maxRetries;
}
先关注正常逻辑,校验、异常之类的代码暂时不看,那么就只剩下1号代码。
1号代码是将SinkRecord对象集合通过JdbcDbWriter的write方法写入数据库
进入JdbcDbWriter的write方法:
void write(final Collection<SinkRecord> records) throws SQLException {
final Connection connection = cachedConnectionProvider.getConnection(); //1
final Map<TableId, BufferedRecords> bufferByTable = new HashMap<>();
for (SinkRecord record : records) {
final TableId tableId = destinationTable(record.topic()); //2
BufferedRecords buffer = bufferByTable.get(tableId); //3
if (buffer == null) {
buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, connection);//4
bufferByTable.put(tableId, buffer);//5
}
buffer.add(record);//6
}
for (Map.Entry<TableId, BufferedRecords> entry : bufferByTable.entrySet()) {
TableId tableId = entry.getKey();
BufferedRecords buffer = entry.getValue();
log.debug("Flushing records in JDBC Writer for table ID: {}", tableId);
buffer.flush();//7
buffer.close();
}
connection.commit();
}
1号代码通过CachedConnectionProvider对象获取数据库连接,这个对象是在JdbcDbWriter的构造器中进行初始化的
2号代码主要作用就是获取当前record所对应的表名
3 4 5 6号代码最终目的就是将当前接受到的SinkRecord对象集合中操作同一张表的SinkRecord对象聚合到一个BufferedRecords对象中
7号代码开始对某一张表的SinkRecord对象进行解析并插入数据库的操作了
先来看6号代码处的add方法做了什么操作:
public List<SinkRecord> add(SinkRecord record) throws SQLException {
final List<SinkRecord> flushed = new ArrayList<>();
boolean schemaChanged = false;
if (!Objects.equals(keySchema, record.keySchema())) {//1
keySchema = record.keySchema();//2
schemaChanged = true;//3
}
if (isNull(record.valueSchema())) {//4
// For deletes, both the value and value schema come in as null.
// We don't want to treat this as a schema change if key schemas is the same
// otherwise we flush unnecessarily.
if (config.deleteEnabled) {//5
deletesInBatch = true;//6
}
} else if (Objects.equals(valueSchema, record.valueSchema())) {//7
if (config.deleteEnabled && deletesInBatch) {//8
// flush so an insert after a delete of same record isn't lost
flushed.addAll(flush());//9
}
} else {
// value schema is not null and has changed. This is a real schema change.
valueSchema = record.valueSchema();//10
schemaChanged = true;//11
}
if (schemaChanged) {//12
// Each batch needs to have the same schemas, so get the buffered records out
flushed.addAll(flush());
// re-initialize everything that depends on the record schema
final SchemaPair schemaPair = new SchemaPair(
record.keySchema(),
record.valueSchema()
);
fieldsMetadata = FieldsMetadata.extract(
tableId.tableName(),
config.pkMode,
config.pkFields,
config.fieldsWhitelist,
schemaPair
);
dbStructure.createOrAmendIfNecessary(
config,
connection,
tableId,
fieldsMetadata
);
final String insertSql = getInsertSql();
final String deleteSql = getDeleteSql();
log.debug(
"{} sql: {} deleteSql: {} meta: {}",
config.insertMode,
insertSql,
deleteSql,
fieldsMetadata
);
close();
updatePreparedStatement = dbDialect.createPreparedStatement(connection, insertSql);
updateStatementBinder = dbDialect.statementBinder(
updatePreparedStatement,
config.pkMode,
schemaPair,
fieldsMetadata,
config.insertMode
);
if (config.deleteEnabled && nonNull(deleteSql)) {
deletePreparedStatement = dbDialect.createPreparedStatement(connection, deleteSql);
deleteStatementBinder = dbDialect.statementBinder(
deletePreparedStatement,
config.pkMode,
schemaPair,
fieldsMetadata,
config.insertMode
);
}
}
records.add(record);//13
if (records.size() >= config.batchSize) {
flushed.addAll(flush());
}
return flushed;
}
1 2 3号代码比较了当前对象的keySchema与传入参数record的keySchema,这个keySchema大致可以理解为表的主键定义,所以这部分代码大致含义是,当主键的定义发生改变时,schemaChanged赋值为true
4 5 6号代码表示传入的record对象的valueSchema为空,且配置deleteEnabled为true时,删除标识deletesInBatch赋值为true,也就是后续会根据这个标识删除一条数据
7 8 9号代码表示valueSchema没有变化(可以理解为表的结构没有变化),且还有未刷新的删除record,执行一次刷新操作
10 11号代码,进入这个else分支就表示valueSchema虽然不为空,但是与之前的valueSchema不一样了,意思就是表的结构有所改变,可能是增加字段之类的。
12号代码处的schemaChanged,只有当表结构改变时才会赋值为true,此时会进入这个if分支,分支内代码大致功能是:创建不存在的表或缺失的字段,更新updateStatementBinder和deleteStatementBinder
13号代码将这条record放进当前BufferedRecords对象的SinkRecord集合中,在flush方法中使用
再来看BufferedRecords的flush方法:
public List<SinkRecord> flush() throws SQLException {
if (records.isEmpty()) {
log.debug("Records is empty");
return new ArrayList<>();
}
log.debug("Flushing {} buffered records", records.size());
for (SinkRecord record : records) {//1
if (isNull(record.value()) && nonNull(deleteStatementBinder)) {//2
deleteStatementBinder.bindRecord(record);//3
} else {
updateStatementBinder.bindRecord(record);//4
}
}
Optional<Long> totalUpdateCount = executeUpdates();//5
long totalDeleteCount = executeDeletes();//6
final long expectedCount = updateRecordCount();
log.trace("{} records:{} resulting in totalUpdateCount:{} totalDeleteCount:{}",
config.insertMode, records.size(), totalUpdateCount, totalDeleteCount
);
if (totalUpdateCount.filter(total -> total != expectedCount).isPresent()
&& config.insertMode == INSERT) {
throw new ConnectException(String.format(
"Update count (%d) did not sum up to total number of records inserted (%d)",
totalUpdateCount.get(),
expectedCount
));
}
if (!totalUpdateCount.isPresent()) {
log.info(
"{} records:{} , but no count of the number of rows it affected is available",
config.insertMode,
records.size()
);
}
final List<SinkRecord> flushedRecords = records;
records = new ArrayList<>();
deletesInBatch = false;
return flushedRecords;
}
1 2 3 4号代码遍历了SinkRecord集合,将它们分别绑定到删除操作deleteStatementBinder或更新操作updateStatementBinder,其实内部是解析了SinkRecord的key和value,填充到了statement的占位符中
5 6号代码执行了各自statement的executeBatch方法,是真正执行sql的地方