package cn.lin.app; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.util.Properties; import cn.gtmc.utils.SchemaParserUtil; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; //发送序列化的信息 public class AvroProducer{ public static void main(String[] args)throws Exception{ Properties props=new Properties(); props.put("bootstrap.servers","node1:9092,node2:9092"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); Schema.Parser parser=new Schema.Parser(); String schemaFile = "E:\\Workspaces\\AvroData\\src\\main\\resources\\test.avsc"; File f = new File(schemaFile); InputStream in = new FileInputStream(f); Schema schema = parser.parse(in); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); Producer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); GenericRecord avroRecord = new GenericData.Record(schema); avroRecord.put("version","v1"); avroRecord.put("server","node1"); avroRecord.put("service","halo"); avroRecord.put("domain","halohaha"); avroRecord.put("operation_type","I"); avroRecord.put("loadtime","2020/05/25 00:00:00"); avroRecord.put("guid","1001"); JsonArray array=new JsonArray(); JsonObject language1=new JsonObject(); language1.addProperty("id", 1); language1.addProperty("name", "java"); language1.addProperty("ide", "Eclipse"); array.add(language1); JsonObject language2=new JsonObject(); language2.addProperty("id", 2); language2.addProperty("name", "Swift"); language2.addProperty("ide", "Xcode"); array.add(language2); avroRecord.put("data",array.toString()); byte[] avroRecordBytes = recordInjection.apply(avroRecord); ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>("test", avroRecordBytes); producer.send(record).get(); for (int i=0;i<=10000;i++){ producer.send(record).get(); } } }
avro文件如下:
test.avsc
{ "namespace": "org.mes.fn.action.quartzJob.KafkaTask", "type": "record", "name": "topic", "fields": [{ "name": "version", "type": ["string", "null"] }, { "name": "server", "type": "string" }, { "name": "service", "type": ["string", "null"] }, { "name": "domain", "type": ["string", "null"] }, { "name": "operation_type", "type": ["string", "null"] }, { "name": "loadtime", "type": ["string", "null"] }, { "name": "guid", "type": ["string", "null"] }, { "name": "data", "type": ["null", "string"] } ] }
直接来一个消费者测试:
package cn.gtmc.app; import java.io.*; import java.util.Collections; import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; //反序列化消息 public class AvroConsumer{ public static void main(String[] args) throws IOException { Properties props=new Properties(); props.put("bootstrap.servers","node1:9092,node2:9092"); props.put("group.id","test"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer"); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props); consumer.subscribe(Collections.singletonList("test")); Schema.Parser parser=new Schema.Parser(); String schemaFile = "E:\\Workspaces\\AvroData\\src\\main\\resources\\test.avsc"; File f = new File(schemaFile); InputStream in = new FileInputStream(f); Schema schema = parser.parse(in); // Schema schema=parser.parse(schemaStr); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); while(true){ ConsumerRecords<String, byte[]> records = consumer.poll(100); for(ConsumerRecord<String, byte[]> record : records){ GenericRecord genericRecord = recordInjection.invert(record.value()).get(); System.out.println("KafkaTask.version = "+genericRecord.get("version")+ ",KafkaTask.server = "+genericRecord.get("server")+ ",KafkaTask.service = "+genericRecord.get("service")+ ",KafkaTask.domain = "+genericRecord.get("domain")+ ",KafkaTask.operation_type = "+genericRecord.get("operation_type")+ ",KafkaTask.loadtime = "+genericRecord.get("loadtime")+ ",KafkaTask.guid = "+genericRecord.get("guid")+ ",KafkaTask.data = "+genericRecord.get("data")+ ",partition = "+record.partition()+ ",offset = "+record.offset()); } } } }
使用sparkstreaming来消费kafka的avro格式数据:
package cn.lin.flink import java.io.{File, FileInputStream, InputStream} import java.sql.{DriverManager, ResultSet} import com.alibaba.fastjson.{JSON, JSONObject} import com.twitter.bijection.Injection import com.twitter.bijection.avro.GenericAvroCodecs import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.flink.types.Row import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.Record import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{OffsetRange, _} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable /** * * Date 2019/8/8 10:47 * Desc 使用Spark-Kafka-0-10版本整合,并手动提交偏移量,维护到MySQL中 */ object SparkKafkaDemo2 extends Serializable{ def main(args: Array[String]): Unit = { //1.创建StreamingContext //spark.master should be set as local[n], n > 1 val conf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中对数据进行切分形成一个RDD //准备连接Kafka的参数 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node1:9092,node2:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[ByteArrayDeserializer], "group.id" -> "SparkKafkaDemo", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) //设置主题名 val topicName ="test" val topics = Array(topicName) //2.使用KafkaUtil连接Kafak获取数据 //注意: //如果MySQL中没有记录offset,则直接连接,从latest开始消费 //如果MySQL中有记录offset,则应该从该offset处开始消费 val offsetMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("SparkKafkaDemo",topicName) val recordDStream: InputDStream[ConsumerRecord[String, Array[Byte]]] = if(offsetMap.size > 0){//有记录offset println("MySQL中记录了offset,则从该offset处开始消费") KafkaUtils.createDirectStream[String, Array[Byte]](ssc, LocationStrategies.PreferConsistent,//位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应 ConsumerStrategies.Subscribe[String, Array[Byte]](topics, kafkaParams,offsetMap))//消费策略,源码强烈推荐使用该策略 }else{//没有记录offset println("没有记录offset,则直接连接,从latest开始消费") // /opt/soft/kafka/bin/kafka-console-producer.sh --broker-list node-01:9092 --topic spark_kafka KafkaUtils.createDirectStream[String, Array[Byte]](ssc, LocationStrategies.PreferConsistent,//位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应 ConsumerStrategies.Subscribe[String, Array[Byte]](topics, kafkaParams))//消费策略,源码强烈推荐使用该策略 } //3.操作数据 //注意:我们的目标是要自己手动维护偏移量,也就意味着,消费了一小批数据就应该提交一次offset //而这一小批数据在DStream的表现形式就是RDD,所以我们需要对DStream中的RDD进行操作 //而对DStream中的RDD进行操作的API有transform(转换)和foreachRDD(动作) recordDStream.foreachRDD(rdd=>{ if(rdd.count() > 0){//当前这一时间批次有数据 // rdd.foreach(record => println("接收到的Kafk发送过来的数据为:" + record)) rdd.foreachPartition(iterator => { if( iterator != null && !iterator.isEmpty ){ //作相应的处理 while (iterator.hasNext) { //处理每一条记录 val next = iterator.next//这个就是接收到的数据值对象, val bytes:Array[Byte] = next.value() //可以插入数据库,或者输出到别的地方
Test.parseSchema(bytes)
} } }) //接收到的Kafk发送过来的数据为:ConsumerRecord(topic = spark_kafka, partition = 1, offset = 6, CreateTime = 1565400670211, checksum = 1551891492, serialized key size = -1, serialized value size = 43, key = null, value = hadoop spark ...) //注意:通过打印接收到的消息可以看到,里面有我们需要维护的offset,和要处理的数据 //接下来可以对数据进行处理....或者使用transform返回和之前一样处理 //处理数据的代码写完了,就该维护offset了,那么为了方便我们对offset的维护/管理,spark提供了一个类,帮我们封装offset的数据 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (o <- offsetRanges){ println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset}") } //手动提交offset,默认提交到Checkpoint中 //recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //实际中偏移量可以提交到MySQL/Redis中 OffsetUtil.saveOffsetRanges("SparkKafkaDemo",offsetRanges) } }) /* val lineDStream: DStream[String] = recordDStream.map(_.value())//_指的是ConsumerRecord val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是发过来的value,即一行数据 val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1)) val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_) result.print()*/ ssc.start()//开启 ssc.awaitTermination()//等待优雅停止 } /* 手动维护offset的工具类 首先在MySQL创建如下表 CREATE TABLE `t_offset` ( `topic` varchar(255) NOT NULL, `partition` int(11) NOT NULL, `groupid` varchar(255) NOT NULL, `offset` bigint(20) DEFAULT NULL, PRIMARY KEY (`topic`,`partition`,`groupid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; */ object OffsetUtil { /** * 从数据库读取偏移量 */ def getOffsetMap(groupid: String, topic: String) = { val connection = DriverManager.getConnection("jdbc:mysql://node1:3306/test?characterEncoding=UTF-8", "root", "root") val pstmt = connection.prepareStatement("select * from t_offset where groupid=? and topic=?") pstmt.setString(1, groupid) pstmt.setString(2, topic) val rs: ResultSet = pstmt.executeQuery() val offsetMap = mutable.Map[TopicPartition, Long]() while (rs.next()) { offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset") } rs.close() pstmt.close() connection.close() offsetMap } /** * 将偏移量保存到数据库 */ def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = { val connection = DriverManager.getConnection("jdbc:mysql://node1:3306/test?characterEncoding=UTF-8", "root", "root") //replace into表示之前有就替换,没有就插入 val pstmt = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)") for (o <- offsetRange) { pstmt.setString(1, o.topic) pstmt.setInt(2, o.partition) pstmt.setString(3, groupid) pstmt.setLong(4, o.untilOffset) pstmt.executeUpdate() } pstmt.close() connection.close() } } }
这里是解析部分:
package cn.lin.flink; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import java.io.*; public class Test { public static void parseSchema(byte[] value) throws IOException { Schema.Parser parser=new Schema.Parser(); String schemaFile = "E:\\Workspaces\\AvroData\\src\\main\\resources\\test.avsc"; File f = new File(schemaFile); InputStream in = new FileInputStream(f); Schema schema = parser.parse(in); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); GenericRecord genericRecord = recordInjection.invert(value).get(); Object data = genericRecord.get("data"); System.out.println(data); insertHbase(data); } public static void insertHbase( Object data){ System.out.println("插入到hbase..."); } }
porm.xml如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.lin</groupId> <artifactId>FlinkDemo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.7.2</flink.version> <scala.binary.version>2.11</scala.binary.version> <kafka.version>2.2.0</kafka.version> </properties> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.11</artifactId> <version>0.9.6</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将Scala代码编译成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 声明绑定到maven的compile阶段 --> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef> jar-with-dependencies </descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
扫描二维码关注公众号,回复:
11269182 查看本文章
本机电脑上测试1秒消费一万记录数。