flink-kafka-mysql【事务,外部管理偏移量】
有时候,因为业务的需要,我们需要保证端到端的语义一致性。那么,如果我们将kafka的偏移量外存,再辅以事务机制,即可做到端到端的语义一致性。(当然,如果可能的话,我们只需要保证min等性,也不需要这么麻烦。)
话不多说,直接上案例。
首先,需要建立一张student表,以及kafka偏移量的表。
CREATE TABLE `kafka` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`topic` varchar(11) DEFAULT NULL,
`partition` int(11) DEFAULT NULL,
`offset` bigint(20) DEFAULT NULL,
`created_at` timestamp NULL DEFAULT NULL,
`updated_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `UK_topic_partition_date` (`topic`,`partition`)
) ENGINE=InnoDB AUTO_INCREMENT=263 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `student` (
`id` int(11) NOT NULL,
`name` varchar(100) NOT NULL,
`pass` varchar(40) NOT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
其次,我们往kafka发送数据
package com.nuc.transaction;
import com.alibaba.fastjson.JSON;
import com.nuc.test.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
*
* 往kafka写入数据
*
*/
public class TestKafkaProducer {
public static final String broker_list = "172.16.10.102:9092,172.16.10.103:9092,172.16.10.104:9092";
public static final String topic = "test1";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
for (int i = 1; i <= 10000; i++) {
Student student = new Student(i, "yrx" + i, "password" + i, 18 + i);
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
producer.send(record);
System.out.println("发送数据: " + JSON.toJSONString(student));
Thread.sleep(1 * 500);
}
producer.flush();
}
接着,我们需要消费kafka的数据,这里,需要用到flink的一个方法,将kafka的偏移量以及分区信息获取。
package com.nuc.transaction;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.*;
/**
*
* 满足kafka的外部存储
*
*/
public class TestKafkaMysql {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 事件被处理的时间,是由机器的系统时间来决定
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties properties = new Properties();
properties.put("bootstrap.servers","test1:9092,test2:9092,test3:9092");
properties.put("group.id","test");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("auto.offset.reset","latest");
final ReadKafka readKafka = new ReadKafka();
// 从mysql中读取kafka的偏移量
final ArrayList<String> kafkaOffset = readKafka.getKafkaOffset();
FlinkKafkaConsumer010 test1 = new FlinkKafkaConsumer010("test1", new KeyedDeserializationSchema<KafkaDemo>() {
@Override
public KafkaDemo deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
// 反序列化得到一个Tuple2对象,第一个是topic,第二个是对应的value,类似的可以获取key和分区,offset等信息
return new KafkaDemo(topic, new String(message), partition, offset);
}
@Override
public boolean isEndOfStream(KafkaDemo stringStringTuple2) {
return false;
}
@Override
public TypeInformation<KafkaDemo> getProducedType() {
// 此处返回的是类型参数
return TypeInformation.of(new TypeHint<KafkaDemo>() {
});
}
}, properties);
FlinkKafkaConsumerBase flinkKafkaConsumerBase = test1.setStartFromLatest();
// 如果kafka不为空的话,从这里开始执行
if (kafkaOffset != null && kafkaOffset.size() >0){
System.out.println("执行这里");
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
for (String s : kafkaOffset) {
final String[] strings = s.split("-");
specificStartOffsets.put(new KafkaTopicPartition(strings[0],Integer.parseInt(strings[1])),Long.parseLong(strings[2]));
}
flinkKafkaConsumerBase = test1.setStartFromSpecificOffsets(specificStartOffsets);
}
final SingleOutputStreamOperator map = env.addSource(flinkKafkaConsumerBase).map(new MapFunction<KafkaDemo,Student>() {
public Student map(KafkaDemo kafkaDemo) throws Exception {
final JSONObject jsonObject = JSON.parseObject(kafkaDemo.getMessage());
int id = Integer.parseInt(jsonObject.get("id").toString());
String name = jsonObject.get("name").toString();
String pass = jsonObject.get("pass").toString();
final int age = Integer.parseInt(jsonObject.get("age").toString());
final Student student1 = new Student(id, name, pass, age, kafkaDemo.getTopic(), kafkaDemo.getPartition(), kafkaDemo.getOffset());
return student1;
}
});
map.timeWindowAll(Time.seconds(10))
.apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
ArrayList<Student> students = Lists.newArrayList(values);
if (students.size() > 0){
System.out.println("10 秒内手机到student的数据条数是:"+students.size());
out.collect(students);
}
}
}).addSink(new SinkMySQL());
env.execute("jisuan");
}
}
最后,我们需要自定义sink,也就是我们的事务mysql来存储我们相应的信息。
package com.nuc.transaction;
import com.nuc.jdbc.DruidUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
/**
*
* flink处理kafka偏移量代码
*
*/
public class SinkMySQL extends RichSinkFunction<List<Student>> {
PreparedStatement ps;
private Connection conn;
PreparedStatement ps1;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DruidUtils.getConnection();
String sql = "insert into student(id, name, pass, age) values(?, ?, ?, ?);";
ps = this.conn.prepareStatement(sql);
String sqlPro = "insert into kafka(topic,`partition`,`offset`,created_at,updated_at) values (?, ?, ?,now(),now()) ON DUPLICATE KEY UPDATE offset=GREATEST(offset, VALUES(offset)),updated_at=now();";
ps1 = conn.prepareStatement(sqlPro);
}
@Override
public void close() throws Exception {
super.close();
DruidUtils.close(conn,ps);
}
@Override
public void invoke(List<Student> value, Context context) throws Exception {
conn.setAutoCommit(false);
//遍历数据集合
for (Student student : value) {
ps.setInt(1, student.getId());
ps.setString(2, student.getName());
ps.setString(3, student.getPass());
ps.setInt(4, student.getAge());
ps.addBatch();
}
final HashMap<Integer,Long> map = new HashMap<Integer,Long>();
for (Student student:value) {
final int partition = student.getPartition();
final long offset = student.getOffset();
if (map.containsKey(partition)) {
final long offset1 = map.get(partition);
if (offset > offset1) {
map.put(partition,offset);
}
} else {
map.put(partition,offset);
}
}
final Set<Integer> keySet = map.keySet();
for (int key:keySet){
final long offset = map.get(key);
ps1.setString(1,"test1");
ps1.setInt(2,key);
ps1.setLong(3,offset);
ps1.addBatch();
}
int[] count = ps.executeBatch();//批量后执行
int[] count1 = ps1.executeBatch();
conn.commit();
System.out.println("成功了插入了" + count.length + "行数据");
System.out.println("成功了插入了" + count1.length + "行数据");
}
}