版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/someby/article/details/88980797
目录
本篇文章记录广告点击流量实时统计-为动态黑名单实时计算每天各用户对各广告的点击次数。
代码
util
DateUtils.java
package graduation.java.util; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; /** * FileName: DateUtils * Author: hadoop * Email: [email protected] * Date: 19-2-25 下午7:09 * Description:日期时间工具类 */ public class DateUtils { public static final SimpleDateFormat TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd"); public static final SimpleDateFormat DATEKEY_FORMAT = new SimpleDateFormat("yyyyMMdd"); /** * 判断一个时间是否是在另一个时间之前 * @param time1 第一个时间 * @param time2 第二个时间 * @return 返回判断结果 */ public static boolean before(String time1,String time2){ try { Date dateTime1 = TIME_FORMAT.parse(time1); Date dateTime2 = TIME_FORMAT.parse(time2); if (dateTime1.before(dateTime2)){ return true; } }catch (Exception e){ e.printStackTrace(); } return false; } /** * 判断time1是否在time2的后面 * @param time1 第一个时间 * @param time2 第二个时间 * @return 返回判断结果 */ public static boolean after(String time1,String time2){ try{ Date dateTime1 = TIME_FORMAT.parse(time1); Date dateTime2 = TIME_FORMAT.parse(time2); if (dateTime1.after(dateTime2)){ return true; } }catch(Exception e){ e.printStackTrace(); } return false; } /** * 计算两个时间的差值(单位为秒)" * @param time1 时间1 * @param time2 时间2 * @return 返回两个时间的差值,单位为秒 */ public static int minus(String time1,String time2){ try{ Date dateTime1 = TIME_FORMAT.parse(time1); Date dateTime2 = TIME_FORMAT.parse(time2); long millisecond = dateTime1.getTime() - dateTime2.getTime(); return Integer.valueOf(String.valueOf(millisecond)); }catch (Exception e){ e.printStackTrace(); } return 0; } /** * 获取年月日和小时 * @param datetime 传入的时间格式(yyyy-MM-dd HH:mm:ss) * @return 结果时间 */ public static String getDateHour(String datetime){ String date = datetime.split(" ")[0]; String hourMinuteSecond = datetime.split(" ")[1]; String hour = hourMinuteSecond.split(":")[0]; return date+"_"+hour; } /** * 获取丹田日期(yyyy-MM-dd) * @return 当天日期 */ public static String getTodayDate(){ return DATE_FORMAT.format(new Date()); } /** * 获取昨天的日期(yyyy-MM-dd) * @return 昨天的日期 */ public static String getYesterdayDate(){ Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); cal.add(Calendar.DAY_OF_YEAR,-1); Date date = cal.getTime(); return DATE_FORMAT.format(date); } /** * 格式化日期(yyyy-MM-dd) * @param date DAte对象 * @return 格式化后的对象 */ public static String formatDate(Date date){ return DATE_FORMAT.format(date); } /** * 格式化时间(yyyy-MM-dd HH:mm:ss) * @param date date Date 对象 * @return 格式化后的时间 */ public static String formateTime(Date date){ return TIME_FORMAT.format(date); } /** * 转化时间字符串 * @param time 时间字符串 * @return Date */ public static Date parseTime(String time){ try { return DATE_FORMAT.parse(time); } catch (Exception e){ e.printStackTrace(); } return null; } /** * 格式化日期key * @param date * @return */ public static String formatDateKey(Date date){ return DATEKEY_FORMAT.format(date); } /** * 格式化日期key * @param dateKey * @return */ public static Date parseDateKey(String dateKey){ try{ return DATEKEY_FORMAT.parse(dateKey); }catch(ParseException e){ e.printStackTrace(); } return null; } /** * 格式化时间,保留到分钟级别 * @param date * @return */ public static String formatTimeMinute(Date date){ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm"); return sdf.format(date); } /* public static void main(String[] args){ String t1 = "1551095261"; String t2 = "1551095298"; try { Date time1 = TIME_FORMAT.parse(t1); Date time2 = TIME_FORMAT.parse(t2); System.out.println("Time1: "+t1 + " time1:"+time1); System.out.println("Time2: "+t2 +" time2: "+time2); } catch (ParseException e) { e.printStackTrace(); } boolean beforeResult = before(t2,t1); System.out.println("The result of before:"+beforeResult); } */ }
spark.ad
AdClickRealTimeStatSpark.java
package graduation.java.spark.ad; import graduation.java.conf.ConfigurationManager; import graduation.java.constant.Constants; import graduation.java.util.DateUtils; import graduation.java.util.SparkUtils; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.util.*; /** * FileName: AdClickRealTimeStatSpark * Author: hadoop * Email: [email protected] * Date: 19-4-2 下午7:27 * Description: * 广告点击流量实时统计spark作业 * */ public class AdClickRealTimeStatSpark { public static void main(String[] args) throws InterruptedException { //构建spark Streaming上下文 SparkConf conf = new SparkConf() .setAppName(Constants.SPARK_APP_NAME_AD); SparkUtils.setMaster(conf); // spark streaming的上下文是构建JavaStreamingContext对象 // 而不是像之前的JavaSparkContext、SQLContext/HiveContext // 传入的第一个参数,和之前的spark上下文一样,也是SparkConf对象;第二个参数则不太一样 // 第二个参数是spark streaming类型作业比较有特色的一个参数 // 实时处理batch的interval // spark streaming,每隔一小段时间,会去收集一次数据源(kafka)中的数据,做成一个batch // 每次都是处理一个batch中的数据 // 通常来说,batch interval,就是指每隔多少时间收集一次数据源中的数据,然后进行处理 // 一遍spark streaming的应用,都是设置数秒到数十秒(很少会超过1分钟) // 咱们这里项目中,就设置5秒钟的batch interval // 每隔5秒钟,咱们的spark streaming作业就会收集最近5秒内的数据源接收过来的数据 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // 正式开始进行代码的编写 // 实现咱们需要的实时计算的业务逻辑和功能 // 创建针对Kafka数据来源的输入DStream(离线流,代表了一个源源不断的数据来源,抽象) // 选用kafka direct api(很多好处,包括自己内部自适应调整每次接收数据量的特性,等等) // 构建kafka参数map // 主要要放置的就是,你要连接的kafka集群的地址(broker集群的地址列表) Map<String,String> kafkaParams = new HashMap<String,String>(); kafkaParams.put(Constants.KAFAK_METADATA_BROKER_LIST, ConfigurationManager.getProperty(Constants.KAFAK_METADATA_BROKER_LIST)); //构建topics set String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICES); String[] kafkaTopicsSplited = kafkaTopics.split(","); Set<String> topics = new HashSet<String>(); for (String kafkaTopic : kafkaTopicsSplited){ topics.add(kafkaTopic); } // 基于kafka direct api模式,构建出了针对kafka集群中指定topic的输入DStream // 两个值,val1,val2;val1没有什么特殊的意义;val2中包含了kafka topic中的一条一条的实时日志数据 JavaPairInputDStream<String,String> adRealTimeLogDStream = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); // 一条一条的实时日志 // timestamp province city userid adid // 某个时间点 某个省份 某个城市 某个用户 某个广告 // 计算出每5个秒内的数据中,每天每个用户每个广告的点击量 // 通过对原始实时日志的处理 // 将日志的格式处理成<yyyyMMdd_userid_adid, 1L>格式 JavaPairDStream<String,Long> dailyUserAdClickDStream = adRealTimeLogDStream.mapToPair(new PairFunction<Tuple2<String, String>, String, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Long> call(Tuple2<String, String> tuple) throws Exception { //从tuple中拿取一条日志 String log = tuple._2; String[] logSplited = log.split(","); //提取出日期(yyyyMMdd),userid,adid String timetamp = logSplited[0]; Date date = new Date(Long.valueOf(timetamp)); String dateKey = DateUtils.formatDateKey(date); long userid = Long.valueOf(logSplited[3]); long adid = Long.valueOf(logSplited[4]); //拼接key String key = dateKey + "_"+ userid+"_" + adid; return new Tuple2<String,Long>(key,1L); } }); // 针对处理后的日志格式,执行reduceByKey算子即可 // (每个batch中)每天每个用户对每个广告的点击量 JavaPairDStream<String,Long> dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey(new Function2<Long, Long, Long>() { private static final long serialVersionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1+v2; } }); // 到这里为止,获取到了什么数据呢? // dailyUserAdClickCountDStream DStream // 源源不断的,每个5s的batch中,当天每个用户对每支广告的点击次数 // <yyyyMMdd_userid_adid, clickCount> // 构建完spark streaming上下文之后,记得要进行上下文的启动、等待执行结束、关闭 jssc.start(); jssc.awaitTermination(); jssc.stop(); } }