1.先将Hadoop环境加入到eclipse中
2.生成模拟数据
1 package com.blb.core; 2 3 import java.io.BufferedWriter; 4 import java.io.File; 5 import java.io.FileNotFoundException; 6 import java.io.FileOutputStream; 7 import java.io.FileWriter; 8 import java.io.IOException; 9 import java.io.OutputStreamWriter; 10 import java.util.ArrayList; 11 import java.util.List; 12 import java.util.Random; 13 14 /** 15 * 300户 每户都会有一个清单文件 16 * 商品是随机 数量也是随机 17 * 洗漱用品 脸盆、杯子、牙刷和牙膏、毛巾、肥皂(洗衣服的)以及皂盒、洗发水和护发素、沐浴液 [1-5之间] 18 * 床上用品 比如枕头、枕套、枕巾、被子、被套、棉被、毯子、床垫、凉席 [0 1之间] 19 * 家用电器 比如电磁炉、电饭煲、吹风机、电水壶、豆浆机、台灯等 [1-3之间] 20 * 厨房用品 比如锅、碗、瓢、盆、灶 [1-2 之间] 21 * 柴、米、油、盐、酱、醋 [1-6之间] 22 * 要生成300个文件 命名规则 1-300来表示 23 * @author Administrator 24 * 25 */ 26 public class BuildBill { 27 private static Random random=new Random(); //要还是不要 28 private static List<String> washList=new ArrayList<>(); 29 private static List<String> bedList=new ArrayList<>(); 30 private static List<String> homeList=new ArrayList<>(); 31 private static List<String> kitchenList=new ArrayList<>(); 32 private static List<String> useList=new ArrayList<>(); 33 34 static{ 35 washList.add("脸盆"); 36 washList.add("杯子"); 37 washList.add("牙刷"); 38 washList.add("牙膏"); 39 washList.add("毛巾"); 40 washList.add("肥皂"); 41 washList.add("皂盒"); 42 washList.add("洗发水"); 43 washList.add("护发素"); 44 washList.add("沐浴液"); 45 /////////////////////////////// 46 bedList.add("枕头"); 47 bedList.add("枕套"); 48 bedList.add("枕巾"); 49 bedList.add("被子"); 50 bedList.add("被套"); 51 bedList.add("棉被"); 52 bedList.add("毯子"); 53 bedList.add("床垫"); 54 bedList.add("凉席"); 55 ////////////////////////////// 56 homeList.add("电磁炉"); 57 homeList.add("电饭煲"); 58 homeList.add("吹风机"); 59 homeList.add("电水壶"); 60 homeList.add("豆浆机"); 61 homeList.add("电磁炉"); 62 homeList.add("台灯"); 63 ////////////////////////// 64 kitchenList.add("锅"); 65 kitchenList.add("碗"); 66 kitchenList.add("瓢"); 67 kitchenList.add("盆"); 68 kitchenList.add("灶 "); 69 //////////////////////// 70 useList.add("米"); 71 useList.add("油"); 72 useList.add("盐"); 73 useList.add("酱"); 74 useList.add("醋"); 75 } 76 //确定要还是不要 1/2 77 private static boolean iswant() 78 { 79 int num=random.nextInt(1000); 80 if(num%2==0) 81 { 82 return true; 83 } 84 else 85 { 86 return false; 87 } 88 } 89 90 /** 91 * 表示我要几个 92 * @param sum 93 * @return 94 */ 95 private static int wantNum(int sum) 96 { 97 return random.nextInt(sum); 98 } 99 100 101 102 //生成300个清单文件 格式如下 103 //输出的文件的格式 一定要是UTF-8 104 //油 2 105 public static void main(String[] args) { 106 for(int i=1;i<=300;i++) 107 { 108 try { 109 //字节流 110 FileOutputStream out=new FileOutputStream(new File("D:\\tmp\\"+i+".txt")); 111 112 //转换流 可以将字节流转换字符流 设定编码格式 113 //字符流 114 BufferedWriter writer=new BufferedWriter(new OutputStreamWriter(out,"UTF-8")); 115 //随机一下 我要不要 随机一下 要几个 再从我们的清单里面 随机拿出几个来 数量 116 boolean iswant1=iswant(); 117 if(iswant1) 118 { 119 //我要几个 不能超过该类商品的总数目 120 int wantNum = wantNum(washList.size()+1); 121 //3 122 for(int j=0;j<wantNum;j++) 123 { 124 String product=washList.get(random.nextInt(washList.size())); 125 writer.write(product+"\t"+(random.nextInt(5)+1)); 126 writer.newLine(); 127 } 128 } 129 130 boolean iswant2=iswant(); 131 if(iswant2) 132 { 133 //我要几个 不能超过该类商品的总数目 134 int wantNum = wantNum(bedList.size()+1); 135 //3 136 for(int j=0;j<wantNum;j++) 137 { 138 String product=bedList.get(random.nextInt(bedList.size())); 139 writer.write(product+"\t"+(random.nextInt(1)+1)); 140 writer.newLine(); 141 } 142 } 143 144 boolean iswant3=iswant(); 145 if(iswant3) 146 { 147 //我要几个 不能超过该类商品的总数目 148 int wantNum = wantNum(homeList.size()+1); 149 //3 150 for(int j=0;j<wantNum;j++) 151 { 152 String product=homeList.get(random.nextInt(homeList.size())); 153 writer.write(product+"\t"+(random.nextInt(3)+1)); 154 writer.newLine(); 155 } 156 } 157 boolean iswant4=iswant(); 158 if(iswant4) 159 { 160 //我要几个 不能超过该类商品的总数目 161 int wantNum = wantNum(kitchenList.size()+1); 162 //3 163 for(int j=0;j<wantNum;j++) 164 { 165 String product=kitchenList.get(random.nextInt(kitchenList.size())); 166 writer.write(product+"\t"+(random.nextInt(2)+1)); 167 writer.newLine(); 168 } 169 } 170 171 boolean iswant5=iswant(); 172 if(iswant5) 173 { 174 //我要几个 不能超过该类商品的总数目 175 int wantNum = wantNum(useList.size()+1); 176 //3 177 for(int j=0;j<wantNum;j++) 178 { 179 String product=useList.get(random.nextInt(useList.size())); 180 writer.write(product+"\t"+(random.nextInt(6)+1)); 181 writer.newLine(); 182 } 183 } 184 writer.flush(); 185 writer.close(); 186 } catch (FileNotFoundException e) { 187 // TODO Auto-generated catch block 188 e.printStackTrace(); 189 } catch (IOException e) { 190 // TODO Auto-generated catch block 191 e.printStackTrace(); 192 } 193 } 194 } 195 196 197 }
3.创建MapReduce项目
4.生成Mapper类、Reduce类和Driver类
5.Mapper类代码
1 package com.blb.core; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 10 public class BillMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 11 12 public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { 13 String line = ivalue.toString(); 14 String[] words = line.split("\t"); 15 context.write(new Text(words[0]), new IntWritable(Integer.parseInt(words[1]))); 16 } 17 18 }
6.Reduce类代码
1 package com.blb.core; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 public class BillReduce extends Reducer<Text, IntWritable, Text, IntWritable> { 10 11 public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 12 // process values 13 int sum=0; 14 for (IntWritable val : values) { 15 int i = val.get(); 16 sum+=i; 17 } 18 context.write(_key, new IntWritable(sum)); 19 } 20 }
7.Driver类代码
1 package com.blb.core; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 public class BillDriver { 14 15 public static void main(String[] args) throws Exception { 16 Configuration conf = new Configuration(); 17 conf.set("fs.defaultFS", "hdfs://sugar01:9000"); 18 Job job = Job.getInstance(conf, "BillDriver"); 19 job.setJarByClass(BillDriver.class); 20 // TODO: specify a mapper 21 job.setMapperClass(BillMapper.class); 22 // TODO: specify a reducer 23 job.setReducerClass(BillReduce.class); 24 25 // TODO: specify output types 26 job.setOutputKeyClass(Text.class); 27 job.setOutputValueClass(IntWritable.class); 28 29 // TODO: specify input and output DIRECTORIES (not files) 30 FileInputFormat.setInputPaths(job, new Path("/uploads")); 31 FileOutputFormat.setOutputPath(job, new Path("/out2/")); 32 33 if (!job.waitForCompletion(true)) 34 return; 35 } 36 37 }