Mysql To Kafka Through Canal
1.配置pom.xml文件
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>
2.main方法入口
public class CanalToKafkaServer {
public static void main(String[] args) {
SimpleCanalClient simpleCanalClient = new SimpleCanalClient();
try {
simpleCanalClient.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.Canal客户端
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
public class SimpleCanalClient {
private CanalConnector connector=null;
public SimpleCanalClient() {
// 创建链接
connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.2.66",
11111), "example", "canal", "canal");
}
public List<Entry> execute() throws SecurityException {
int batchSize = 1;
int emptyCount = 0;
CanalKafkaProducer producer = new CanalKafkaProducer();
try {
connector.connect();
connector.subscribe(".*\\..*");
// connector.subscribe("test.test1");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
StringBuffer stringBuffer = ParseMessage.concatEntry(message.getEntries());
producer.send("canalDemo",stringBuffer.toString());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
connector.disconnect();
producer.close();
}
return null;
}
}
4.解析binlog
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.Iterator;
import java.util.List;
public class ParseMessage {
public static StringBuffer concatEntry(List<CanalEntry.Entry> entrys) {
StringBuffer sb = new StringBuffer();
int dataCount = 0;
for (int count = 0; count < entrys.size(); count++) {
CanalEntry.Entry entry = entrys.get(count);
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
dataCount++;
StringBuffer option = new StringBuffer();
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
StringBuffer message = new StringBuffer();
message.append("database:"+entry.getHeader().getSchemaName()+",");
message.append("table:"+entry.getHeader().getTableName()+",");
message.append("eventType:"+eventType+",");
message.append("data:");
for (int index = 0; index < rowChage.getRowDatasList().size(); index++) {
CanalEntry.RowData rowData = rowChage.getRowDatasList().get(index);
if (eventType == CanalEntry.EventType.DELETE) {
StringBuffer buffer = concatColumn(rowData.getBeforeColumnsList());
message.append(buffer);
} else if (eventType == CanalEntry.EventType.INSERT) {
StringBuffer buffer = concatColumn(rowData.getAfterColumnsList());
message.append(buffer);
} else {
StringBuffer bufferBefore = concatColumn(rowData.getBeforeColumnsList());
message.append(bufferBefore+" ");
StringBuffer bufferAfter = concatColumn(rowData.getAfterColumnsList());
message.append(bufferAfter);
}
if(index!=rowChage.getRowDatasList().size()-1){
message.append("&");
}
}
option.append(message);
sb.append(option);
if(dataCount<entrys.size()/3){
sb.append("\r\n");
}
}
return sb;
}
private static StringBuffer concatColumn(List<CanalEntry.Column> columns) {
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < columns.size(); i++) {
CanalEntry.Column column = columns.get(i);
buffer.append(column.getName() + "_" + column.getValue());
if (i<columns.size()-1){
buffer.append("-");
}
}
return buffer;
}
}
5.Produce方法
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CanalKafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
private Producer<String, String> producer;
public CanalKafkaProducer() {
// 1.创建Kafka生产者的配置信息
Properties properties = new Properties();
// 2.添加配置信息,指定连接的kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"slave1:9092,slave2:9092,slave3:9092");
// 3.ACK应答级别
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 4.消息发送最大尝试次数
properties.put("retries", 3);
// 5.一批消息处理大小,16384为16K
properties.put("batch.size", 16384);
// 6.增加服务端请求延时,等待时间
properties.put("linger.ms", 1);
// 7.发送缓存区内存大小,33554432为32M
properties.put("buffer.memory", 33554432);
// 8.key,value 序列化
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 自定义分区
// properties.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner");
// 9.创建生产者对象
producer = new KafkaProducer<>(properties);
}
public void close() {
try {
logger.info("## stop the kafka producer");
producer.close();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping kafka producer:", e);
} finally {
logger.info("## kafka producer is down.");
}
}
public void send(String topic, String message) throws IOException {
ProducerRecord<String, String> record;
record = new ProducerRecord<String, String>(topic, message);
producer.send(record);
if (logger.isDebugEnabled()) {
logger.debug("send message to kafka topic: {} \n {}", topic, message);
}
}
}
解析message发送至Kafka的数据为
database:test,table:t_stu,eventType:INSERT,data:id_10-name_zhangsan&id_11-name_lisi
database:test,table:t_stu,eventType:UPDATE,data:id_8-name_xixi id_8-name_new&id_9-name_xixi id_9-name_new
如有不懂的,欢迎一起交流讨论。
下一篇: Kafka数据写入Hudi