从mysql导数据到kafka
1、kafka confluent
介绍 link
。。。
2、kafka connector-jdbc
介绍 link
先安装kafka,然后下载confluent的包,默认这个安装包中已经包含了kafka、zookeeper等一些列kafka相关的东西。看实际情况。我这里是已经自己安装了apache kafka 。
启动 schema.registry服务
通过standalone模式启动jdbc-Connector
bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/quickstart-mysql.properties
vi etc/kafka-connect-jdbc/quickstart-mysql.properties name=test-mysql-jdbc-autoincrement connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://192.168.0.35:3306/test?user=root&password= mode=incrementing incrementing.column.name=id topics=t_resources topic.prefix=test-mysql table.whitelist=t_resources
3、通过java读取 Connector-jdbc写入的数据
需要confluent的jar。
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.1</version>
</dependency>
package com.test.kafka; import java.util.Collections; import java.util.Properties; import org.apache.avro.generic.GenericData; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * reference https://www.iteblog.com on 2017-09-20. */ public class AvroKafkaConsumer09 { public static void main(String[] args) { Logger logger = LoggerFactory.getLogger("AvroKafkaConsumer"); Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.83:9092"); props.put("group.id", "testgroup"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", "http://192.168.0.83:8081"); KafkaConsumer<String, GenericData.Record> consumer = new KafkaConsumer<>(props); String topic = "test-mysqlt_resources"; // consumer.subscribe(Collections.singletonList(topic)); // Schema.Parser parser = new Schema.Parser(); // Schema schema = parser.parse(AvroKafkaProducter.USER_SCHEMA); // Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); // try { consumer.assign(Collections.singletonList( new TopicPartition( topic, 0) ) ); consumer.seek( new TopicPartition( topic, 0), 0L ); //add while (true) { ConsumerRecords<String, GenericData.Record> records = consumer.poll(1000); for (ConsumerRecord<String, GenericData.Record> record : records) { logger.info( record.key()+ ">Tostring:"+ record.value().toString() +"\n" ); // GenericRecord genericRecord = recordInjection.invert(record.value()).get(); // logger.info( "Tostring:"+genericRecord.toString()+"\n" ); } } } finally { consumer.close(); } } }
4、通过distributed模式运行connect
1、connect-avro-distributed.properties
注意配置项:
plugin.path=/root/kafka/tools/confluent-4.0.0/share/java #绝对路径,/root/kafka/tools/confluent-4.0.0 为confluent安装目录
2、启动connect-distributed
cd confluent安装目录
[root@server confluent-4.0.0]# nohup ./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties > /tmp/kafka-cluster-connect.log &
3、REST-API管理connector
curl -X POST -H "Content-Type: application/json" --data '{"name": "test-mysql-jdbc-autoincrement","config": {"connector.class": "JdbcSourceConnector","connection.url": "jdbc:mysql://192.168.0.15:3306/test?user=root&password=","tasks.max": "1","mode": "incrementing","incrementing.column.name": "id","topic.prefix": "t_resources","table.whitelist": "test-mysql","topics": "connect-test"}}' http://localhost:8083/connectors
详细 link