版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/someby/article/details/89028913
本篇文章记录广告点击流量实时统计-计算每天各省的top3热门广告。
代码
domain
AdProvinceTop3.java
package graduation.java.domain; /** * FileName: AdProvinceTop3 * Author: hadoop * Email: [email protected] * Date: 19-4-4 下午4:32 * Description: * 各省份广告点击top3热门广告 */ public class AdProvinceTop3 { private String date; private String province; private long adid; private long clickCount; public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public long getAdid() { return adid; } public void setAdid(long adid) { this.adid = adid; } public long getClickCount() { return clickCount; } public void setClickCount(long clickCount) { this.clickCount = clickCount; } @Override public String toString() { return "AdProvinceTop3{" + "date='" + date + '\'' + ", province='" + province + '\'' + ", adid=" + adid + ", clickCount=" + clickCount + '}'; } }
dao
IAdProvinceTop3DAO.java
package graduation.java.dao; import graduation.java.domain.AdProvinceTop3; import java.util.List; /** * FileName: IAdProvinceTop3DAO * Author: hadoop * Email: [email protected] * Date: 19-4-4 下午4:34 * Description: * 各省份热门广告top3DAO接口类 */ public interface IAdProvinceTop3DAO { /** * 批量更新各省份热门top3广告 * @param adProvinceTop3List */ void updateBatch(List<AdProvinceTop3> adProvinceTop3List); }
impl
AdProvinceTop3DAOImpl.java
package graduation.java.impl; import graduation.java.dao.IAdProvinceTop3DAO; import graduation.java.domain.AdProvinceTop3; import graduation.java.jdbc.JDBCHelper; import java.util.ArrayList; import java.util.List; /** * FileName: AdProvinceTop3DAOImpl * Author: hadoop * Email: [email protected] * Date: 19-4-4 下午4:37 * Description: * * 各省份热门广告top3DAO实现类 */ public class AdProvinceTop3DAOImpl implements IAdProvinceTop3DAO { @Override public void updateBatch(List<AdProvinceTop3> adProvinceTop3List) { JDBCHelper jdbcHelper = JDBCHelper.getInstance(); // 先做一次去重(date_province) List<String> dateProvinces = new ArrayList<String>(); for (AdProvinceTop3 adProvinceTop3 : adProvinceTop3List){ String date = adProvinceTop3.getDate(); String province = adProvinceTop3.getProvince(); String key = date + "_"+ province; if (!dateProvinces.contains(key)){ dateProvinces.add(key); } } //根据去重后的date和province进行批量删除操作 String deleteSQL = "DELETE FROM ad_province_top3 WHERE date=? AND provice=?"; List<Object[]> deleteParamsList = new ArrayList<Object[]>(); for (String dateProvince : dateProvinces){ String[] keySplited = dateProvince.split("_"); String date = keySplited[0]; String province = keySplited[1]; Object[] params = new Object[]{date,province}; deleteParamsList.add(params); } jdbcHelper.executeBatch(deleteSQL,deleteParamsList); // 批量插入传入进来的所有数据 String insertSQL = "INSERT INTO ad_province_top3 VALUES(?,?,?,?)"; List<Object[]> insertParamsList = new ArrayList<Object[]>(); for (AdProvinceTop3 adProvinceTop3 : adProvinceTop3List){ Object[] params = new Object[]{ adProvinceTop3.getDate(), adProvinceTop3.getProvince(), adProvinceTop3.getAdid(), adProvinceTop3.getClickCount()}; insertParamsList.add(params); } jdbcHelper.executeBatch(insertSQL,insertParamsList); } }
factory
DAOFactory.java
/** * 各省份热门广告top3 管理DAO * @return */ public static IAdProvinceTop3DAO getAdProvinceDAO(){ return new AdProvinceTop3DAOImpl(); }
spark.ad
AdClickRealTimeStatSpark.java
/** * 计算每天各省份top3热门广告 * @param adRealTimeStatDStream */ private static void calculateProvinceTop3Ad(JavaPairDStream<String,Long> adRealTimeStatDStream){ // adRealTimeStatDStream // 每一个batch rdd,都代表了最新的全量的每天各省份各城市各广告的点击量 JavaDStream<Row> rowsDStream = adRealTimeStatDStream.transform(new Function<JavaPairRDD<String, Long>, JavaRDD<Row>>() { private static final long serialVersionUID = 1L; @Override public JavaRDD<Row> call(JavaPairRDD<String, Long> rdd) throws Exception { // <yyyyMMdd_province_city_adid, clickCount> // <yyyyMMdd_province_adid, clickCount> // 计算出每天各省份各广告的点击量 JavaPairRDD<String,Long> mappedRDD = rdd.mapToPair(new PairFunction<Tuple2<String, Long>, String, Long>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Long> call(Tuple2<String, Long> tuple) throws Exception { String[] keySplited = tuple._1.split("_"); String date = keySplited[0]; String province = keySplited[1]; long adid = Long.valueOf(keySplited[3]); long clickCount = tuple._2; String key = date + "_" + province + "_" +adid; return new Tuple2<String,Long>(key,clickCount); } }); JavaPairRDD<String,Long> dailyAdClickCountByProvinceRDD = mappedRDD.reduceByKey(new Function2<Long, Long, Long>() { private static final long serialVerionUID = 1L; @Override public Long call(Long v1, Long v2) throws Exception { return v1+v2; } }); // 将dailyAdClickCountByProvinceRDD转换为DataFrame // 注册为一张临时表 // 使用Spark SQL,通过开窗函数,获取到各省份的top3热门广告 JavaRDD<Row> rowsRDD = dailyAdClickCountByProvinceRDD.map(new Function<Tuple2<String, Long>, Row>() { private static final long serialVerionUID = 1L; @Override public Row call(Tuple2<String, Long> tuple) throws Exception { String[] keySplited = tuple._1.split("_"); String dateKey = keySplited[0]; String province = keySplited[1]; long adid = Long.valueOf(keySplited[2]); long clickCount = tuple._2; String date = DateUtils.formatDate(DateUtils.parseDateKey(dateKey)); return RowFactory.create(date,province,adid,clickCount); } }); StructType schema = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("date", DataTypes.StringType,true), DataTypes.createStructField("province", DataTypes.StringType,true), DataTypes.createStructField("ad_id", DataTypes.LongType,true), DataTypes.createStructField("click_count", DataTypes.LongType,true))); HiveContext sqlContext = new HiveContext(rdd.context()); Dataset dailyAdClickCountByProvinceDs = sqlContext.createDataFrame(rowsRDD,schema); // 将dailyAdClickCountByProvinceDF,注册成一张临时表 dailyAdClickCountByProvinceDs.registerTempTable("tmp_daily_ad_click_count_by_prov"); // 使用Spark SQL执行SQL语句,配合开窗函数,统计出各身份top3热门的广告 Dataset provinceTop3AdDs = sqlContext.sql( "SELECT " + "date, " + "province, " + "ad_id, " + "click_count " + "FROM ( " + "SELECT " + "date, " + "province, " + "ad_id, " + "click_count, " + "ROW_NUMBER() OVER(PARTITION BY province ORDER BY click_count DESC) rank " + "FROM tmp_daily_ad_click_count_by_prov " + " ) t " + "WHERE rank<=3" ); return provinceTop3AdDs.javaRDD(); } }); // rowsDStream // 每次都是刷新出来各个省份最热门的top3广告 // 将其中的数据批量更新到MySQL中 rowsDStream.foreachRDD(new VoidFunction<JavaRDD<Row>>() { private static final long serialVerionUID = 1L; @Override public void call(JavaRDD<Row> rdd) throws Exception { rdd.foreachPartition(new VoidFunction<Iterator<Row>>() { private static final long serialVersionUID = 1L; @Override public void call(Iterator<Row> iterator) throws Exception { List<AdProvinceTop3> adProvinceTop3s = new ArrayList<AdProvinceTop3>(); while (iterator.hasNext()){ Row row = iterator.next(); String date = row.getString(0); String province = row.getString(1); long adid = row.getLong(2); long clickCount = row.getLong(3); AdProvinceTop3 adProvinceTop3 = new AdProvinceTop3(); adProvinceTop3.setDate(date); adProvinceTop3.setProvince(province); adProvinceTop3.setAdid(adid); adProvinceTop3.setClickCount(clickCount); adProvinceTop3s.add(adProvinceTop3); } IAdProvinceTop3DAO adProvinceTop3DAO = DAOFactory.getAdProvinceDAO(); adProvinceTop3DAO.updateBatch(adProvinceTop3s); } }); } }); }