——我恰好爱上了你 这关无性别
[单MapReduce]编写MapReduce程序算出高峰时间段(9-10点)哪张表被访问的最频繁
*纯个人见解,单个MapReduce解题过程,如有不足,请多多指教,谢谢
需求:
- * 用Hadoop分析海量日志文件,每行日志记录了如下数据:
- * TableName(表名),Time(时间),User(用户),TimeSpan(时间开销)
- * 要求编写MapReduce程序算出高峰时间段(如9-10点)哪张表被访问的最频繁
- * 以及这段时间访问这张表最多的用户,以及这个用户访问这张表的总时间开销。
- * 先找出9-10点访问量最大的表
测试数据:
- TableName(表名),Time(时间),User(用户),TimeSpan(时间开销)
- ==========================================================
- *t003 6:00 u002 180
- *t003 7:00 u002 180
- *t003 7:08 u002 180
- *t003 7:25 u002 180
- *t002 8:00 u002 180
- *t001 8:00 u001 240
- *t001 9:00 u002 300
- *t001 9:11 u001 240
- *t003 9:26 u001 180
- *t001 9:39 u001 300
- ==========================================================
解题代码部分:
Mapper.java
package com.company.max; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { Date date = null; Date min = null; Date max = null; String string = value.toString(); //*t003 6:00 u002 180 String[] split = string.split(" "); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("hh:mm"); try { date = simpleDateFormat.parse(split[1]); min = simpleDateFormat.parse("9:00"); max = simpleDateFormat.parse("10:00"); //判断进行筛选出有用数据 //*t001 9:00 u002 300 //*t001 9:11 u001 240 //*t003 9:26 u001 180 //*t001 9:39 u001 300 if(date.compareTo(min)>=0 && date.compareTo(max)<=0) { context.write(new Text(split[0]), new Text(split[1]+" "+split[2]+" "+split[3])); } } catch (ParseException e) { e.printStackTrace(); } } }
Reducer.java
package com.company.max; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxReducer extends Reducer<Text, Text,Text, Text>{ // 利用reduce()方法任务执行完成之后 // 会调用一次cleanup()方法 // 来继续编写未完成的代码 Map<String,Integer> map = new HashMap<>(); Map<String,Integer> hashMap = new HashMap<>(); List<String> list = new ArrayList<>(); @Override protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { for (Text text : value) { //因为value只能遍历一次,在这里把它放入list集合备用 list.add(key.toString()+"\t"+text.toString()); // 用map计算表名出现的次数 // *t001 3 // *t003 1 if(map.get(key.toString())==null) { map.put(key.toString(), 1); }else { map.put(key.toString(), map.get(key.toString())+1); } } } @Override protected void cleanup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //自定义方法获取key出现次数最多的那个表名,该方法在计算用户访问最多的时候可重用 String maxKey = getMaxKey(map); //自定义方法计算用户访问最多的那位兄台 String maxUser = getMaxUser(maxKey); int sum = 0; for (String string : list) { String[] split = string.split("\t"); if(split[0].equals(maxKey)) { //*t001 9:00 u002 300 //*t001 9:11 u001 240 //*t001 9:39 u001 300 String[] split2 = split[1].split(" "); if(split2[1].equals(maxUser)) { sum += Integer.parseInt(split2[2]); } } } String key = maxKey+" "+maxUser+" "+sum; context.write(new Text(key), new Text()); } private String getMaxUser(String maxKey) { for (String string : list) { String[] split = string.split("\t"); if(split[0].equals(maxKey)) { System.out.println(string); String[] split2 = split[1].split(" "); // 用map计算用户出现的次数 // u001 2 // u002 1 if(hashMap.get(split2[1])==null) { hashMap.put(split2[1], 1); }else { hashMap.put(split2[1], hashMap.get(split2[1])+1); } } } String maxUser = getMaxKey(hashMap); return maxUser; } private String getMaxKey(Map<String, Integer> map2) { String key = ""; int val = 0; Set<Entry<String,Integer>> set = map2.entrySet(); for (Entry<String, Integer> entry : set) { //找出最大值,返回出现次数最多的那个表名 if(val<entry.getValue()) { val = entry.getValue(); key = entry.getKey(); } } return key; } }
Runner.java
package com.company.max; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxRunner { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setMapperClass(MaxMapper.class); job.setReducerClass(MaxReducer.class); job.setJarByClass(MaxRunner.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("/Users/xuran/Desktop/month02")); FileOutputFormat.setOutputPath(job, new Path("/Users/xuran/Desktop/month02/result")); boolean waitForCompletion = job.waitForCompletion(true); System.exit(waitForCompletion?0:1); } }
运行结果:
*t001 u001 2160