/** * */ package cn.focus.dc.hadoop; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Progressable; /** * @author qiaowang * */ public class PingeUgcGroupStat { private static final String PINGE_ACTIVE = "pinge.log"; private static java.util.Map<String, Set<Integer>> map = new HashMap<String, Set<Integer>>(); public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String[] ugc = { "user.login", "user.register", "pic.fav", "pic.unfav", "user.follow", "user.unfollow", "pic.like", "pic.unlike" }; Set<String> ugcSet = new HashSet<String>(); //构建默认key for (int i = 0; i < ugc.length; i++) { String ugcWord = ugc[i]; ugcSet.add(ugcWord); } String line = value.toString(); String[] words = line.split("\\|"); if (ugcSet.contains(words[3]) && !"".equals(words[4])) { // 没有版本信息 StringBuilder buf = new StringBuilder(); buf.append(words[1]).append("\t").append(words[2]).append("\t").append(words[3]); word.set(buf.toString()); int uid = Integer.valueOf(words[4]); output.collect(word, new IntWritable(uid)); } else if (ugcSet.contains(words[4]) && !"".equals(words[5])) { // 有版本信息 StringBuilder buf = new StringBuilder(); buf.append(words[1]).append("\t").append(words[2]).append("\t").append(words[4]); word.set(buf.toString()); int uid = Integer.valueOf(words[5]); output.collect(word, new IntWritable(uid)); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { while (values.hasNext()) { if(!map.containsKey(key.toString())){ Set<Integer> set = new HashSet<Integer>(); map.put(key.toString(), set); } else { Set<Integer> set = map.get(key.toString()); set.add(values.next().get()); map.put(key.toString(), set); } } int size = map.get(key.toString()).size(); output.collect(key, new IntWritable(size)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(PingeStat.class); conf.setJobName("pingeStat"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[1])); FileOutputFormat.setOutputPath(conf, new Path(args[2])); Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(URI.create(args[1]), config); // 获得hdfs文件系统设置 // FileSystem hdfs = DistributedFileSystem.get(conf); FileSystem local = FileSystem.getLocal(config); // 获得本地文件系统设置 Path inputDir = new Path(args[0]); // 本地输入目录 Path hdfsFile = new Path(args[1]); // 远程输出文件 try { FileStatus[] inputFiles = local.listStatus(inputDir); // 数组,用来循环保存本地文件目录信息 FSDataOutputStream out = hdfs.create(hdfsFile, new Progressable() { @Override public void progress() { // TODO Auto-generated method stub System.out.print("."); } }); // 创新输出hdfs文件 for (int i = 0; i < inputFiles.length; i++) { // 循环取出本地文件目录信息 if (inputFiles[i].isDir()) { // 根据目录机构的特点获取每个子目录下pinge.access.log文件 // 补全文件名 String fileName = args[0] + inputFiles[i].getPath().getName() + "/" + PINGE_ACTIVE;// 主要关心文件名 Path filePath = new Path(fileName); FSDataInputStream in = null; try { in = local.open(filePath); // 打开本地文件 } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } if (null != in) { byte buffer[] = new byte[256]; int bytesRead = 0; while ((bytesRead = in.read(buffer)) > 0) { out.write(buffer, 0, bytesRead); // 合并文件 } in.close(); } } } out.close(); } catch (IOException e) { // TODO: handle exception e.printStackTrace(); } // 删除输出目录 deleteFromHdfs(args[2]); // 运行job JobClient.runJob(conf); } /** 从HDFS上删除文件 */ private static void deleteFromHdfs(String dst) throws FileNotFoundException, IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); fs.deleteOnExit(new Path(dst)); fs.close(); } }
获取活跃用户
/** * */ package cn.focus.dc.hadoop; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Progressable; /** * @author qiaowang * */ public class PingeActiveStat { private static final String PINGE_ACTIVE = "pinge.access.log"; private static java.util.Map<String, Set<Integer>> map = new HashMap<String, Set<Integer>>(); public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String[] words = line.split("\\|"); StringBuilder buf = new StringBuilder(); if ("user.active".equals(words[3]) && !"".equals(words[4])) { // 没有版本信息 buf.append(words[1]).append("\t").append(words[2]).append("\t").append(words[3]); int uid = Integer.valueOf(words[4]); //int uidEnd = uid%10; //word.set(String.valueOf(uidEnd)); word.set(buf.toString()); output.collect(word, new IntWritable(uid)); } else if ("user.active".equals(words[4]) && !"".equals(words[5])) { // 有版本信息 buf.append(words[1]).append("\t").append(words[2]).append("\t").append(words[4]); int uid = Integer.valueOf(words[5]); //int uidEnd = uid%10; //word.set(String.valueOf(uidEnd)); word.set(buf.toString()); output.collect(word, new IntWritable(uid)); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { while (values.hasNext()) { if(!map.containsKey(key.toString())){ Set<Integer> set = new HashSet<Integer>(); map.put(key.toString(), set); } else { Set<Integer> set = map.get(key.toString()); set.add(values.next().get()); map.put(key.toString(), set); } } int size = map.get(key.toString()).size(); output.collect(key, new IntWritable(size)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(PingeStat.class); conf.setJobName("pingeStat"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[1])); FileOutputFormat.setOutputPath(conf, new Path(args[2])); Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(URI.create(args[1]), config); // 获得hdfs文件系统设置 // FileSystem hdfs = DistributedFileSystem.get(conf); FileSystem local = FileSystem.getLocal(config); // 获得本地文件系统设置 Path inputDir = new Path(args[0]); // 本地输入目录 Path hdfsFile = new Path(args[1]); // 远程输出文件 try { FileStatus[] inputFiles = local.listStatus(inputDir); // 数组,用来循环保存本地文件目录信息 FSDataOutputStream out = hdfs.create(hdfsFile, new Progressable() { @Override public void progress() { // TODO Auto-generated method stub System.out.print("."); } }); // 创新输出hdfs文件 for (int i = 0; i < inputFiles.length; i++) { // 循环取出本地文件目录信息 if (inputFiles[i].isDir()) { // 根据目录机构的特点获取每个子目录下pinge.access.log文件 // 补全文件名 String fileName = args[0] + inputFiles[i].getPath().getName() + "/" + PINGE_ACTIVE;// 主要关心文件名 Path filePath = new Path(fileName); FSDataInputStream in = null; try { in = local.open(filePath); // 打开本地文件 } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } if (null != in) { byte buffer[] = new byte[256]; int bytesRead = 0; while ((bytesRead = in.read(buffer)) > 0) { out.write(buffer, 0, bytesRead); // 合并文件 } in.close(); } } } out.close(); } catch (IOException e) { // TODO: handle exception e.printStackTrace(); } // 删除输出目录 deleteFromHdfs(args[2]); // 运行job JobClient.runJob(conf); } /** 从HDFS上删除文件 */ private static void deleteFromHdfs(String dst) throws FileNotFoundException, IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); fs.deleteOnExit(new Path(dst)); fs.close(); } }
编译:
/usr/lib/jdk1.6.0_33/bin/javac -classpath /opt/hadoop/hadoop-core-1.1.2.jar -d /home/hadoop/pinge_classes/ /home/hadoop/pinge_classes/PingeActiveStat.java
hadoop@Master:~/pinge_classes$ /usr/lib/jdk1.6.0_33/bin/jar -cvf /home/hadoop/PingeActiveStat.jar -C /home/hadoop/pinge_classes/ .
运行
/opt/hadoop/bin/hadoop jar /home/hadoop/PingeActiveStat.jar cn.focus.dc.hadoop.PingeActiveStat /opt/tmp_log/pinge-access-2013-08-18.log hdfs://10.1.77.213:54310/user/hadoop/pinge_access/pinge-access-2013-08-18.log hdfs://10.1.77.213:54310/user/hadoop/pinge_access_output/
放权限
/opt/apps/hadoop/bin/hadoop fs -chmod -R 777 /user
/usr/lib/jdk1.6.0_33/bin/javac -classpath /opt/hadoop/hadoop-core-1.1.2.jar -d /home/hadoop/pinge_ugc_classes/ /home/hadoop/pinge_ugc_classes/PingeUgcStat.java
/usr/lib/jdk1.6.0_33/bin/jar -cvf /home/hadoop/PingeUgcStat.jar -C /home/hadoop/pinge_ugc_classes/ .
/opt/hadoop/bin/hadoop jar /home/hadoop/PingeUgcStat.jar cn.focus.dc.hadoop.PingeUgcStat /opt/tmp_log/pinge-2013-08-24.log hdfs://10.1.77.213:54310/user/hadoop/pinge_ugc/pinge-ugc-2013-08-24.log hdfs://10.1.77.213:54310/user/hadoop/pinge_ugc_output/
/opt/apps/hadoop/bin/hadoop jar /opt/stat/PingeUgcStat.jar cn.focus.dc.hadoop.PingeUgcStat /opt/tmp_log/pinge-2013-08-24.log hdfs://localhost:54310/user/hadoop/pinge_ugc/pinge-ugc-2013-08-24.log hdfs://localhost:54310/user/hadoop/pinge_ugc_output/