本文章主要通过spark sql实现新闻网站关键指标的离线分析功能
1 页面pv统计以及排序
2 页面uv统计以及排序
3 新用户注册比例统计
4 用户跳出比例统计
5 板块热度排行榜统计
首先需要生成对应的访问数据
import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.Random; /** * 离线数据生成器 * @author Administrator * */ public class OfflineDataGenerator { public static void main(String[] args) throws Exception { StringBuffer buffer = new StringBuffer(""); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); Random random = new Random(); String[] sections = new String[] {"country", "international", "sport", "entertainment", "movie", "carton", "tv-show", "technology", "internet", "car"}; int[] newOldUserArr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; // 生成日期,默认就是昨天 Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); cal.add(Calendar.DAY_OF_YEAR, -1); Date yesterday = cal.getTime(); String date = sdf.format(yesterday); // 生成1000条访问数据 for(int i = 0; i < 1000; i++) { // 生成时间戳 long timestamp = new Date().getTime(); // 生成随机userid(默认1000注册用户,每天1/10的访客是未注册用户) Long userid = 0L; int newOldUser = newOldUserArr[random.nextInt(10)]; if(newOldUser == 1) { userid = null; } else { userid = (long) random.nextInt(1000); } // 生成随机pageid(总共1k个页面) Long pageid = (long) random.nextInt(1000); // 生成随机版块(总共10个版块) String section = sections[random.nextInt(10)]; // 生成固定的行为,view String action = "view"; buffer.append(date).append("") .append(timestamp).append("") .append(userid).append("") .append(pageid).append("") .append(section).append("") .append(action).append("\n"); } // 生成10条注册数据 for(int i = 0; i < 10; i++) { // 生成时间戳 long timestamp = new Date().getTime(); // 新用户都是userid为null Long userid = null; // 生成随机pageid,都是null Long pageid = null; // 生成随机版块,都是null String section = null; // 生成固定的行为,view String action = "register"; buffer.append(date).append("") .append(timestamp).append("") .append(userid).append("") .append(pageid).append("") .append(section).append("") .append(action).append("\n"); } PrintWriter pw = null; try { pw = new PrintWriter(new OutputStreamWriter( new FileOutputStream("C:\\Users\\Administrator\\Desktop\\access.log"))); pw.write(buffer.toString()); } catch (Exception e) { e.printStackTrace(); } finally { pw.close(); } } }
具体离线指标分析
import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; /** * 新闻网站关键指标离线统计Spark作业 * @author Administrator * */ public class NewsOfflineStatSpark { public static void main(String[] args) { String yesterday = getYesterday(); // 创建SparkConf以及Spark上下文 SparkConf conf = new SparkConf() .setAppName("NewsOfflineStatSpark") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); // 开发第一个关键指标:页面pv统计以及排序 calculateDailyPagePv(hiveContext, yesterday); // 开发第二个关键指标:页面uv统计以及排序 calculateDailyPageUv(hiveContext, yesterday); // 开发第三个关键指标:新用户注册比率统计 calculateDailyNewUserRegisterRate(hiveContext, yesterday); // 开发第四个关键指标:用户跳出率统计 calculateDailyUserJumpRate(hiveContext, yesterday); // 开发第五个关键指标:版块热度排行榜 calculateDailySectionPvSort(hiveContext, yesterday); // 关闭Spark上下文 sc.close(); } /** * 获取昨天的字符串类型的日期 * @return 日期 */ private static String getYesterday() { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); cal.add(Calendar.DAY_OF_YEAR, -1); Date yesterday = cal.getTime(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); return sdf.format(yesterday); } /** * 计算每天每个页面的pv以及排序 * 排序的好处:排序后,插入mysql,java web系统要查询每天pv top10的页面,直接查询mysql表limit 10就可以 * 如果我们这里不排序,那么java web系统就要做排序,反而会影响java web系统的性能,以及用户响应时间 */ private static void calculateDailyPagePv( HiveContext hiveContext, String date) { String sql = "SELECT " + "date," + "pageid," + "pv " + "FROM ( " + "SELECT " + "date," + "pageid," + "count(*) pv " + "FROM news_access " + "WHERE action='view' " + "AND date='" + date + "' " + "GROUP BY date,pageid " + ") t " + "ORDER BY pv DESC "; DataFrame df = hiveContext.sql(sql); // 在这里,我们也可以转换成一个RDD,然后对RDD执行一个foreach算子 // 在foreach算子中,将数据写入mysql中 df.show(); } /** * 计算每天每个页面的uv以及排序 * Spark SQL的count(distinct)语句,有bug,默认会产生严重的数据倾斜 * 只会用一个task,来做去重和汇总计数,性能很差 * @param hiveContext * @param date */ private static void calculateDailyPageUv( HiveContext hiveContext, String date) { String sql = "SELECT " + "date," + "pageid," + "uv " + "FROM ( " + "SELECT " + "date," + "pageid," + "count(*) uv " + "FROM ( " + "SELECT " + "date," + "pageid," + "userid " + "FROM news_access " + "WHERE action='view' " + "AND date='" + date + "' " + "GROUP BY date,pageid,userid " + ") t2 " + "GROUP BY date,pageid " + ") t " + "ORDER BY uv DESC "; DataFrame df = hiveContext.sql(sql); df.show(); } /** * 计算每天的新用户注册比例 * @param hiveContext * @param date */ private static void calculateDailyNewUserRegisterRate( HiveContext hiveContext, String date) { // 昨天所有访问行为中,userid为null,新用户的访问总数 String sql1 = "SELECT count(*) FROM news_access WHERE action='view' AND date='" + date + "' AND userid IS NULL"; // 昨天的总注册用户数 String sql2 = "SELECT count(*) FROM news_access WHERE action='register' AND date='" + date + "' "; // 执行两条SQL,获取结果 Object result1 = hiveContext.sql(sql1).collect()[0].get(0); long number1 = 0L; if(result1 != null) { number1 = Long.valueOf(String.valueOf(result1)); } Object result2 = hiveContext.sql(sql2).collect()[0].get(0); long number2 = 0L; if(result2 != null) { number2 = Long.valueOf(String.valueOf(result2)); } // 计算结果 System.out.println("======================" + number1 + "======================"); System.out.println("======================" + number2 + "======================"); double rate = (double)number2 / (double)number1; System.out.println("======================" + formatDouble(rate, 2) + "======================"); } /** * 计算每天的用户跳出率 * @param hiveContext * @param date */ private static void calculateDailyUserJumpRate( HiveContext hiveContext, String date) { // 计算已注册用户的昨天的总的访问pv String sql1 = "SELECT count(*) FROM news_access WHERE action='view' AND date='" + date + "' AND userid IS NOT NULL "; // 已注册用户的昨天跳出的总数 String sql2 = "SELECT count(*) FROM ( SELECT count(*) cnt FROM news_access WHERE action='view' AND date='" + date + "' AND userid IS NOT NULL GROUP BY userid HAVING cnt=1 ) t "; // 执行两条SQL,获取结果 Object result1 = hiveContext.sql(sql1).collect()[0].get(0); long number1 = 0L; if(result1 != null) { number1 = Long.valueOf(String.valueOf(result1)); } Object result2 = hiveContext.sql(sql2).collect()[0].get(0); long number2 = 0L; if(result2 != null) { number2 = Long.valueOf(String.valueOf(result2)); } // 计算结果 System.out.println("======================" + number1 + "======================"); System.out.println("======================" + number2 + "======================"); double rate = (double)number2 / (double)number1; System.out.println("======================" + formatDouble(rate, 2) + "======================"); } /** * 计算每天的版块热度排行榜 * @param hiveContext * @param date */ private static void calculateDailySectionPvSort( HiveContext hiveContext, String date) { String sql = "SELECT " + "date," + "section," + "pv " + "FROM ( " + "SELECT " + "date," + "section," + "count(*) pv " + "FROM news_access " + "WHERE action='view' " + "AND date='" + date + "' " + "GROUP BY date,section " + ") t " + "ORDER BY pv DESC "; DataFrame df = hiveContext.sql(sql); df.show(); } /** * 格式化小数 * @param num 字符串 * @param scale 四舍五入的位数 * @return 格式化小数 */ private static double formatDouble(double num, int scale) { BigDecimal bd = new BigDecimal(num); return bd.setScale(scale, BigDecimal.ROUND_HALF_UP).doubleValue(); } }