问题需求:在实际工程中,比如淘宝等,重复消费可能导致重复支付问题,导致用户的RMB损失。
解决办法:利用redis,将消费过的数据存起来,并设置失效时间,以及消费的标志位,消费过的数据标志位为1,未消费的数据标志位为0。重启程序后,消费数据前利用redis判断数据是否被消费过,将消费过的数据过滤掉。
选择redis的原因:redis基于内存,对程序的开销影响不大。
代码1:在kafka获取数据并判断
package sparkStreamingRedis
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import Utils.RedisUtils
import redis.clients.jedis.Jedis
object sparkStreamingKafka {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaSparkStreaming")
val context = new SparkContext(conf)
context.setLogLevel("WARN") //设置日志级别
val ssc = new StreamingContext(context , Seconds(5))
var locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent //定义kafka的位置策略
val brokers = "spark1:9092" //kafka的brokers
val topic = "sparkdemo" //kafka的主题
val group = "sparkaGroup" //kafka的分组
val kafkaParam = Map( //定义kafka的配置信息
"bootstrap.servers"-> brokers,
"key.deserializer" ->classOf[StringDeserializer],
"value.deserializer"->classOf[StringDeserializer],
"group.id"->group,
"auto.offset.reset"-> "latest",
"enable.auto.commit" ->(false:java.lang.Boolean)
);
var consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(Array(topic), kafkaParam)//消费策略
var resultDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy) //从kafka获取数据
resultDStream.foreachRDD(iter=>{ //使用foreachPartition遍历每个分片的方式
if(iter.count() > 0){
iter.foreachPartition{partitionOfRecords =>{
partitionOfRecords.foreach(record=>{
val text=record.value()
val label=text+": "+record.offset()
val redisConn:Jedis=RedisUtils.getContion() //获取一个redis连接
if(redisConn.get(label)==null){ //key对应的value为空,说明redis中不存在该数据
redisConn.set(label,"0","NX","EX",120) //数据写入redis,设置失效时间为120秒
println(text) //这里打印一下数据就代表消费数据了(简单一点)
redisConn.setrange(label,0,"1") //setrange方法只会覆盖原value的值,不会覆盖失效时间
}else if(redisConn.get(label)=="0"){ //为"0"时只写入的数据,但是并未消费数据
println(text) //
redisConn.setrange(label,0,"1")
}else{ //当为"1"的时候,不需要做任何操作,继续迭代
}
RedisUtils.returnConn(redisConn) //将redis连接释放掉
})
}
}
val ranges: Array[OffsetRange] = iter.asInstanceOf[HasOffsetRanges].offsetRanges
resultDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
}
})
ssc.start()
ssc.awaitTermination()
}
}
代码2:redis工具类,定义redis的相关属性和方法
package Utils
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object RedisUtils {
val conf=new JedisPoolConfig()
conf.setMaxTotal(10) //设置最大连接数
conf.setMaxIdle(5) //设置最大空闲数
conf.setTestOnBorrow(true) //当调用borrow object 方法时,是否进行有效性验证
val redisPool=new JedisPool(conf,"192.168.116.189",6379,3000,"test") //定义pool配置,配置信息,主机IP,端口,超时时间,密码
def getContion(): Jedis = { //从连接池获取redis连接
val redisConn:Jedis=redisPool.getResource()
redisConn
}
def returnConn(redisConn:Jedis){ //释放redis连接
redisPool.returnResource(redisConn)
}
}
注:有什么问题可以留言,看到会回复的