概述:总结一下Spark1.5.2+Kafka 0.8+中使用Direct Approach(直接获取的方式)消费Kafka数据开发过程中遇到的各种问题和经验.
整合Spring
Spark开发中引入Spring可以方便的管理配置信息,数据源等,借助于Scala的单例特性可以创建一个独立的加载Spring IOC容器的工具类,剩下的事情就好办了,在配置文件中完成就可以了,并没有尝试去使用注解,当然也觉得没有必要这里用Spring仅仅是想使用Spring进行一系列的配置文件的管理,和特殊Bean的配置.
object SpringUtil { val context=new ClassPathXmlApplicationContext("spring.xml") }
整合Elasticsearch
经验看来在Spark中使用Elasticsearch for Apache Hadoop 模块提供的Spark RDD写入Elasticsearch 的性能并不比直接使用Elasticsearch 的Bulk Api更快,在实时流量大的场景下直接使用Elasticsearch Bulk Api 写入效率更高,是时候发挥Spring的作用了
object ElasticsearchUtil { val log = LogFactory.getLog(ElasticsearchUtil.getClass); val config = SpringUtil.context.getBean(classOf[Config]) val settings = Settings.builder().put("cluster.name", config.getEsCluster) val client = TransportClient.builder().settings(settings).build() for (node <- config.getEsNodes.split(",")) { val iport = node.split(":") if (iport.length == 1) { client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(iport(0)), 9300)) } else if (iport.length == 2) { client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(iport(0)), iport(1).toInt)) } else { log.warn("非法的 node配置:" + node + "!!") } } val bulkProcessor = BulkProcessor.builder(client, new BluBulkProcessorListener()) //设置每Bulk的操作数量,满了就执行刷新操作 .setBulkActions(config.getBulkActions) //设置Bulk到多大的时候执行一次刷新 .setBulkSize(new ByteSizeValue(config.getBulkSize)) //多久执行一次刷新操作 .setFlushInterval(TimeValue.timeValueMillis(config.getBulkFlushInterval)) .setConcurrentRequests(config.getBulkConcurrentRequest) //设置运行策略尝试三次间隔1秒 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)) .setName("bulkinstall") .build() }
整合Kafka
这里使用Spark Kafka 模块提供的直接访问Kafka分区数据的方式来获取RDD数据,如果需要记录消费的状态则需要手动维护偏移量.这里做一个简单的封装,代码复用了Spark Streaming Kafka模块的KafkaCluster API并少有改动,封装了KafkaUtils,以对外提供是否允许消费历史数据的语意,同时实现了auto.offset.reset 语意.
修改KafkaCluster类的原因为它本身的几个方法是私有的,简单改造后可直接使用,主要用来访问Kafaka集群信息,并可便捷的访问集群信息和消费者信息.
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { import KafkaCluster.{ Err, LeaderOffset, SimpleConsumerConfig } // ConsumerConfig isn't serializable @transient private var _config: SimpleConsumerConfig = null def config: SimpleConsumerConfig = this.synchronized { if (_config == null) { _config = SimpleConsumerConfig(kafkaParams) } _config } def connect(host: String, port: Int): SimpleConsumer = new SimpleConsumer(host, port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId) def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] = findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2)) // Metadata api // scalastyle:off // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI // scalastyle:on def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = { val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, Seq(topic)) val errs = new Err withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp: TopicMetadataResponse = consumer.send(req) resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata => tm.partitionsMetadata.find(_.partitionId == partition) }.foreach { pm: PartitionMetadata => pm.leader.foreach { leader => return Right((leader.host, leader.port)) } } } Left(errs) } def findLeaders( topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, (String, Int)]] = { val topics = topicAndPartitions.map(_.topic) val response = getPartitionMetadata(topics).right val answer = response.flatMap { tms: Set[TopicMetadata] => val leaderMap = tms.flatMap { tm: TopicMetadata => tm.partitionsMetadata.flatMap { pm: PartitionMetadata => val tp = TopicAndPartition(tm.topic, pm.partitionId) if (topicAndPartitions(tp)) { pm.leader.map { l => tp -> (l.host -> l.port) } } else { None } } }.toMap if (leaderMap.keys.size == topicAndPartitions.size) { Right(leaderMap) } else { val missing = topicAndPartitions.diff(leaderMap.keySet) val err = new Err err.append(new SparkException(s"Couldn't find leaders for ${missing}")) Left(err) } } answer } def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = { getPartitionMetadata(topics).right.map { r => r.flatMap { tm: TopicMetadata => tm.partitionsMetadata.map { pm: PartitionMetadata => TopicAndPartition(tm.topic, pm.partitionId) } } } } def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = { val req = TopicMetadataRequest( TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq) val errs = new Err withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp: TopicMetadataResponse = consumer.send(req) val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError) if (respErrs.isEmpty) { return Right(resp.topicsMetadata.toSet) } else { respErrs.foreach { m => val cause = ErrorMapping.exceptionFor(m.errorCode) val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?" errs.append(new SparkException(msg, cause)) } } } Left(errs) } // Leader offset api // scalastyle:off // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI // scalastyle:on def getLatestLeaderOffsets( topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, LeaderOffset]] = getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) def getEarliestLeaderOffsets( topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, LeaderOffset]] = getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) def getLeaderOffsets( topicAndPartitions: Set[TopicAndPartition], before: Long): Either[Err, Map[TopicAndPartition, LeaderOffset]] = { getLeaderOffsets(topicAndPartitions, before, 1).right.map { r => r.map { kv => // mapValues isnt serializable, see SI-7005 kv._1 -> kv._2.head } } } private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] = m.groupBy(_._2).map { kv => kv._1 -> kv._2.keys.toSeq } def getLeaderOffsets( topicAndPartitions: Set[TopicAndPartition], before: Long, maxNumOffsets: Int): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = { findLeaders(topicAndPartitions).right.flatMap { tpToLeader => val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader) val leaders = leaderToTp.keys var result = Map[TopicAndPartition, Seq[LeaderOffset]]() val errs = new Err withBrokers(leaders, errs) { consumer => val partitionsToGetOffsets: Seq[TopicAndPartition] = leaderToTp((consumer.host, consumer.port)) val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition => tp -> PartitionOffsetRequestInfo(before, maxNumOffsets) }.toMap val req = OffsetRequest(reqMap) val resp = consumer.getOffsetsBefore(req) val respMap = resp.partitionErrorAndOffsets partitionsToGetOffsets.foreach { tp: TopicAndPartition => respMap.get(tp).foreach { por: PartitionOffsetsResponse => if (por.error == ErrorMapping.NoError) { if (por.offsets.nonEmpty) { result += tp -> por.offsets.map { off => LeaderOffset(consumer.host, consumer.port, off) } } else { errs.append(new SparkException( s"Empty offsets for ${tp}, is ${before} before log beginning?")) } } else { errs.append(ErrorMapping.exceptionFor(por.error)) } } } if (result.keys.size == topicAndPartitions.size) { return Right(result) } } val missing = topicAndPartitions.diff(result.keySet) errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}")) Left(errs) } } // Consumer offset api // scalastyle:off // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI // scalastyle:on // this 0 here indicates api version, in this case the original ZK backed api. private def defaultConsumerApiVersion: Short = 0 /** Requires Kafka >= 0.8.1.1 */ def getConsumerOffsets( groupId: String, topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, Long]] = getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion) def getConsumerOffsets( groupId: String, topicAndPartitions: Set[TopicAndPartition], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Long]] = { getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r => r.map { kv => kv._1 -> kv._2.offset } } } /** Requires Kafka >= 0.8.1.1 */ def getConsumerOffsetMetadata( groupId: String, topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion) def getConsumerOffsetMetadata( groupId: String, topicAndPartitions: Set[TopicAndPartition], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = { var result = Map[TopicAndPartition, OffsetMetadataAndError]() val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion) val errs = new Err withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp = consumer.fetchOffsets(req) val respMap = resp.requestInfo val needed = topicAndPartitions.diff(result.keySet) needed.foreach { tp: TopicAndPartition => respMap.get(tp).foreach { ome: OffsetMetadataAndError => if (ome.error == ErrorMapping.NoError) { result += tp -> ome } else { errs.append(ErrorMapping.exceptionFor(ome.error)) } } } if (result.keys.size == topicAndPartitions.size) { return Right(result) } } val missing = topicAndPartitions.diff(result.keySet) errs.append(new SparkException(s"Couldn't find consumer offsets for ${missing}")) Left(errs) } /** Requires Kafka >= 0.8.1.1 */ def setConsumerOffsets( groupId: String, offsets: Map[TopicAndPartition, Long]): Either[Err, Map[TopicAndPartition, Short]] = setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion) def setConsumerOffsets( groupId: String, offsets: Map[TopicAndPartition, Long], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = { val meta = offsets.map { kv => kv._1 -> OffsetAndMetadata(kv._2) } setConsumerOffsetMetadata(groupId, meta, consumerApiVersion) } /** Requires Kafka >= 0.8.1.1 */ def setConsumerOffsetMetadata( groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata]): Either[Err, Map[TopicAndPartition, Short]] = setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion) def setConsumerOffsetMetadata( groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = { var result = Map[TopicAndPartition, Short]() val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion) val errs = new Err val topicAndPartitions = metadata.keySet withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp = consumer.commitOffsets(req) val respMap = resp.commitStatus val needed = topicAndPartitions.diff(result.keySet) needed.foreach { tp: TopicAndPartition => respMap.get(tp).foreach { err: Short => if (err == ErrorMapping.NoError) { result += tp -> err } else { errs.append(ErrorMapping.exceptionFor(err)) } } } if (result.keys.size == topicAndPartitions.size) { return Right(result) } } val missing = topicAndPartitions.diff(result.keySet) errs.append(new SparkException(s"Couldn't set offsets for ${missing}")) Left(errs) } // Try a call against potentially multiple brokers, accumulating errors private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)(fn: SimpleConsumer => Any): Unit = { brokers.foreach { hp => var consumer: SimpleConsumer = null try { consumer = connect(hp._1, hp._2) fn(consumer) } catch { case NonFatal(e) => errs.append(e) } finally { if (consumer != null) { consumer.close() } } } } } object KafkaCluster { type Err = ArrayBuffer[Throwable] /** If the result is right, return it, otherwise throw SparkException */ def checkErrors[T](result: Either[Err, T]): T = { result.fold( errs => throw new SparkException(errs.mkString("\n")), ok => ok) } case class LeaderOffset(host: String, port: Int, offset: Long) /** * High-level kafka consumers connect to ZK. ConsumerConfig assumes this use case. * Simple consumers connect directly to brokers, but need many of the same configs. * This subclass won't warn about missing ZK params, or presence of broker params. */ class SimpleConsumerConfig private (brokers: String, originalProps: Properties) extends ConsumerConfig(originalProps) { val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp => val hpa = hp.split(":") if (hpa.size == 1) { throw new SparkException(s"Broker not the in correct format of <host>:<port> [$brokers]") } (hpa(0), hpa(1).toInt) } } object SimpleConsumerConfig { /** * Make a consumer config without requiring group.id or zookeeper.connect, * since communicating with brokers also needs common settings such as timeout */ def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = { // These keys are from other pre-existing kafka configs for specifying brokers, accept either val brokers = kafkaParams.get("metadata.broker.list") .orElse(kafkaParams.get("bootstrap.servers")) .getOrElse(throw new SparkException( "Must specify metadata.broker.list or bootstrap.servers")) val props = new Properties() kafkaParams.foreach { case (key, value) => // prevent warnings on parameters ConsumerConfig doesn't know about if (key != "metadata.broker.list" && key != "bootstrap.servers") { props.put(key, value) } } Seq("zookeeper.connect", "group.id").foreach { s => if (!props.containsKey(s)) { props.setProperty(s, "") } } new SimpleConsumerConfig(brokers, props) } } } class SparkException(message: String, cause: Throwable) extends Exception(message, cause) { def this(message: String) = this(message, null) }
MYKafkaUtils实现了手动提交偏移的核心逻辑,下文已经添加注释,并实现了auto.offset.reset语意.
object MYKafkaUtils { val log: Log = LogFactory.getLog(MYKafkaUtils.getClass) val config = SpringUtil.context.getBean(classOf[Config]) val kafkaCluster = new KafkaCluster(Map("metadata.broker.list" -> config.getKafkaBrokerList)) /****/ def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag]( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String], enabledHistory: Boolean): InputDStream[(K, V)] = { //implements auto.offset.reset semanteme val autoOffsetRest = kafkaParams.getOrElse("auto.offset.reset", "largest") val groupId = kafkaParams.getOrElse("group.id", "group.id") if (!enabledHistory) { //don`t record offset KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaParams, topics) } else { var fromOffsets = Map[TopicAndPartition, Long]() val partions = kafkaCluster.getPartitions(topics) //get partitions info exception if (partions.isLeft) { throw new SparkException(s" get kafka partions failed :${partions.left.get}") } //get consumer offsets val consumerOffsets = kafkaCluster.getConsumerOffsets(groupId, partions.right.get) consumerOffsets match { //get consumer offsets exception maybe is new group case Left(err) => { log.error(s"get kafka offsets failed:${err}") var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null autoOffsetRest match { case "smallest " => leaderOffsets = kafkaCluster.getEarliestLeaderOffsets(partions.right.get).right.get case "largest" => leaderOffsets = kafkaCluster.getLatestLeaderOffsets(partions.right.get).right.get case _ => throw new SparkException("auto.offset.reset not in ['smallest','largest']") } leaderOffsets.map({ case (topicPartion, offset) => { fromOffsets += (topicPartion -> offset.offset) (topicPartion, offset.offset) } }) //update consumer offsets kafkaCluster.setConsumerOffsets(groupId, fromOffsets) } case Right(offsets) => { //get the earliest offsets val earliestOffsets = kafkaCluster.getEarliestLeaderOffsets(partions.right.get).right.get offsets.foreach({ case (topicPartion, offset) => { val earliestOffset = earliestOffsets(topicPartion).offset //current offset must greater than earliest offset if (offset < earliestOffset) { log.warn("client offset > earliest offset used earliest") fromOffsets += (topicPartion -> earliestOffset) } else { fromOffsets += (topicPartion -> offset) } } }) } } val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) //create DStream from specific offsets KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler); } } /***/ def updateOffsets[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)], groupId: String): Unit = { var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (offsetRange <- offsetRanges) { kafkaCluster.setConsumerOffsets(groupId, Map(TopicAndPartition(offsetRange.topic, offsetRange.partition) -> offsetRange.untilOffset)) } } }
还有一个最大的坑
分布式系统中跨节点的数据共享设计复杂的数据序列化操作,意外经常发生往往不能被准确的定位,在写Spark分布式应用程序时应当避免在Spark的Driver节点中共享变量或者应该尽可能的避免让可能运行在分布式环境中Work节点上的代码引用到Driver节点中变量。
以下是一段运行良好的程序代码,在Scala中孤立多使用伴生对象来实现单例模式,尽可能的让资源在Work节点中实例化,而不是在Driver中通过序列化传到Work中运行.
object MainApp extends App { val log: Log = LogFactory.getLog(MainApp.getClass) val sparkDefaultConfig = SpringUtil.context.getBean("spark-default", classOf[Properties]) val sparkConfig = new SparkConf() //default configuration from properties sparkConfig.setAll(sparkDefaultConfig) val config = SpringUtil.context.getBean(classOf[Config]) log.info("config:" + config) val kafkaParams = Map("metadata.broker.list" -> config.getKafkaBrokerList, "group.id" -> config.getKafkaGroupId, "auto.offset.reset" -> config.getKafkaAutoOffsetReset) val ssc = new StreamingContext(sparkConfig, Duration.apply(config.getSparkStreamingDuration)) val topics = config.getKafkaTopic.toSet val consumerHistory = config.isBluConsumerHistory() val input = MYKafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, consumerHistory); val msgs = input.map(kv => kv._2).map { msg => BluMsgUtil.analysisMsg(msg) }.filter { msg => msg != null } .foreachRDD(msgs => msgs.foreach { msg => BluMsgUtil.msgToActionRequest(msg) }) if (consumerHistory) { input.foreachRDD(rdd => { MYKafkaUtils.updateOffsets(rdd, config.getKafkaGroupId); }) } try { ssc.start() ssc.awaitTermination() } catch { case ex: Exception => log.error(ex) } }
总结
跳坑许久却依旧在坑里.