使用二次排序来实现customers(顾客信息)与orders(订单信息)相关联
使用的是本地测试,并未放到集群(不影响使用)
customers.txt
(cid,name)
1,zpx
2,zpx1
3,zpx2
4,zpx3
5,zpx4
6,zpx5
orders.txt
(id,price,cid)
1,12.5,1
2,12.6,1
3,12.7,2
4,12.8,2
5,12.9,4
6,12.4,3
7,12.3,4
8,12.2,2
9,12.1,1
10,12.0,2
11,22.0,3
12,23.0,4
13,24.5,1
14,26.0,5
1.组合Key
package com.hadoop.mr.join; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author 邹培贤 * @Title: ${file_name} * @Package ${package_name} * @Description: 用户订单组合Key * @date 2018/7/510:06 */ public class ComboKey implements WritableComparable<ComboKey>{ // 0-customer 1-order private int type; private int cid; private int oid; private String customerInfo=""; private String orderInfo=""; @Override public int compareTo(ComboKey o) { int cid0 = o.getCid(); int type0 = o.getType(); int oid0 = o.getOid(); //是否是同一个customer的数据 if(cid == cid0){ //同一个customer的两个订单order if(type == type0){ return oid - oid0; }else{ //一个custmoer的一个订单order if(type ==0){ //customer在前 return -1; }else{ return 1; } } }else{ //cid不一样 return cid - cid0; } } @Override public void write(DataOutput out) throws IOException { out.writeInt(type); out.writeInt(cid); out.writeInt(oid); out.writeUTF(customerInfo); out.writeUTF(orderInfo); } @Override public void readFields(DataInput in) throws IOException { this.type = in.readInt(); this.cid = in.readInt(); this.oid = in.readInt(); this.customerInfo = in.readUTF(); this.orderInfo = in.readUTF(); } public int getType() { return type; } public void setType(int type) { this.type = type; } public int getCid() { return cid; } public void setCid(int cid) { this.cid = cid; } public int getOid() { return oid; } public void setOid(int oid) { this.oid = oid; } public String getCustomerInfo() { return customerInfo; } public void setCustomerInfo(String customerInfo) { this.customerInfo = customerInfo; } public String getOrderInfo() { return orderInfo; } public void setOrderInfo(String orderInfo) { this.orderInfo = orderInfo; } @Override public String toString() { return "ComboKey{" + "type=" + type + ", cid=" + cid + ", oid=" + oid + ", customerInfo='" + customerInfo + '\'' + ", orderInfo='" + orderInfo + '\'' + '}'; } }
2.按照cid自定义分区对比器
package com.hadoop.mr.join; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * @author 邹培贤 * @Title: ${file_name} * @Package ${package_name} * @Description: 按照cid自定义分区 * @date 2018/7/510:24 */ public class CIDPartitioner extends Partitioner<ComboKey,NullWritable>{ @Override public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions){ return key.getCid() % numPartitions; } }
3.按照cid自定义分组
package com.hadoop.mr.join; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @author 邹培贤 * @Title: ${file_name} * @Package ${package_name} * @Description: 自定义分组对比器,按照cid进行分组 * @date 2018/7/510:33 */ public class CIDGroupComparator extends WritableComparator{ protected CIDGroupComparator() { super(ComboKey.class, true); } public int compare(WritableComparable a, WritableComparable b) { ComboKey k1 = (ComboKey) a; ComboKey k2 = (ComboKey) b; return k1.getCid() - k2.getCid(); } }
4.自定义排序对比器
package com.hadoop.mr.join; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @author 邹培贤 * @Title: ${file_name} * @Package ${package_name} * @Description: 自定义排序对比器 * @date 2018/7/510:35 */ public class ComboKeyComparator extends WritableComparator{ protected ComboKeyComparator() { super(ComboKey.class, true); } public int compare(WritableComparable a, WritableComparable b) { ComboKey k1 = (ComboKey) a; ComboKey k2 = (ComboKey) b; return k1.compareTo(k2); } }
5.Mapper
package com.hadoop.mr.join; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @author 邹培贤 * @Title: ${file_name} * @Package ${package_name} * @Description: 自定义排序对比器 * @date 2018/7/510:35 */ public class ComboKeyComparator extends WritableComparator{ protected ComboKeyComparator() { super(ComboKey.class, true); } public int compare(WritableComparable a, WritableComparable b) { ComboKey k1 = (ComboKey) a; ComboKey k2 = (ComboKey) b; return k1.compareTo(k2); } }
6.Reducer
package com.hadoop.mr.join; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * @author 邹培贤 * @Title: ${file_name} * @Package ${package_name} * @Description: ${todo} * @date 2018/7/510:40 */ public class CustomerJoinOrderReducer extends Reducer<ComboKey,NullWritable,Text,NullWritable> { @Override protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Iterator<NullWritable> it = values.iterator(); it.next(); int type = key.getType(); int cid = key.getCid(); String cinfo = key.getCustomerInfo(); while (it.hasNext()) { it.next(); String oinfo = key.getOrderInfo(); context.write(new Text(cinfo + "," + oinfo), NullWritable.get()); } } }
7.主函数实现
package com.hadoop.mr.join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author 邹培贤 * @Title: ${file_name} * @Package ${package_name} * @Description: ${todo} * @date 2018/7/510:44 */ public class CustomerJoinOrderApp { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); Job job=Job.getInstance(conf); //设置job的各种属性 job.setJobName("CustomerJoinOrderApp"); //作业名称 job.setJarByClass(CustomerJoinOrderApp.class); //搜索类 //添加输入路径 FileInputFormat.addInputPath(job,new Path("F://hadoop-mr//join/")); //设置输出路径 FileOutputFormat.setOutputPath(job,new Path("F://hadoop-mr//out")); job.setMapperClass(CustomerJoinOrderMapper.class); //mapper类 job.setReducerClass(CustomerJoinOrderReducer.class); //reducer类 //设置Map输出类型 job.setMapOutputKeyClass(ComboKey.class); // job.setMapOutputValueClass(NullWritable.class); // //设置ReduceOutput类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // //设置分区类 job.setPartitionerClass(CIDPartitioner.class); //设置分组对比器 job.setGroupingComparatorClass(CIDGroupComparator.class); //设置排序对比器 job.setSortComparatorClass(ComboKeyComparator.class); job.setNumReduceTasks(1); //reduce个数 job.waitForCompletion(true); } }