版权声明:1911907658 https://blog.csdn.net/qq_33598343/article/details/84454788
需求:对于不同商品号,我需要获得分区的信息,并且每个分区我只要一行信息,要求价格为最高
数据:
中间以"\t"分割
1001 Tmall_01 998
1001 Tmall_06 88.8
1001 Tmall_03 522.8
1002 Tmall_03 522.8
1002 Tmall_04 132.4
1002 Tmall_05 372.4
1003 Tmall_01 998
1003 Tmall_02 8.5
1003 Tmall_04 132.4
Mapper:
public class GoodMapper extends Mapper<LongWritable, Text, GoodsBeen, NullWritable>{
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line = value.toString();
String[] fields = line.split("\t");
int p_id = Integer.parseInt(fields[0]);
String p_name = fields[1];
Double p_price = Double.parseDouble(fields[2]);
context.write(new GoodsBeen(p_id, p_name, p_price), NullWritable.get());
}
GoodBeen:
public class GoodsBeen implements WritableComparable<GoodsBeen>{
private int p_id ;
private String p_name ;
private Double P_price ;
@Override
public String toString() {
return p_id+"\t"+p_name+"\t"+P_price;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
p_id = in.readInt();
p_name = in.readUTF();
P_price = in.readDouble();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(p_id);
out.writeUTF(p_name);
out.writeDouble(P_price);
}
@Override
public int compareTo(GoodsBeen o) {
// TODO Auto-generated method stub
return this.P_price>o.P_price?-1:1;
}
public int getP_id() {
return p_id;
}
public void setP_id(int p_id) {
this.p_id = p_id;
}
public String getP_name() {
return p_name;
}
public void setP_name(String p_name) {
this.p_name = p_name;
}
public Double getP_price() {
return P_price;
}
public void setP_price(Double p_price) {
P_price = p_price;
}
public GoodsBeen(int p_id, String p_name, Double p_price) {
super();
this.p_id = p_id;
this.p_name = p_name;
P_price = p_price;
}
public GoodsBeen() {
super();
// TODO Auto-generated constructor stub
}
}
Reducer:
public class GoodsReducer extends Reducer<GoodsBeen, NullWritable, GoodsBeen, NullWritable>{
@Override
protected void reduce(GoodsBeen key, Iterable<NullWritable> value,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
context.write(key, NullWritable.get());
}
}
Partition:
public class GoodPartition extends Partitioner<GoodsBeen, NullWritable>{
@Override
public int getPartition(GoodsBeen key, NullWritable value, int partition) {
// TODO Auto-generated method stub
return (key.getP_id()&Integer.MAX_VALUE)%partition;
}
}
Group:
public class GoodGroup extends WritableComparator{
public GoodGroup() {
super(GoodsBeen.class,true);
// TODO Auto-generated constructor stub
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
GoodsBeen aa = (GoodsBeen)a;
GoodsBeen bb = (GoodsBeen)b;
int rs = 0;
if(aa.getP_id()>bb.getP_id()) {
rs = 1;
}
if(aa.getP_id()<bb.getP_id()) {
rs =-1;
}
if(aa.getP_id()==bb.getP_id())
{
rs = 0;
}
return rs;
}
}
这个构造必须加,不然会报空指针异常,需要指定你要进行辅助排序的class,并且,相同对象的话,我们就赋值0,不给予操作,这样就可以获得每个分区一行数据
Job:
public class GoodJobDrive {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(GoodJobDrive.class);
job.setMapperClass(GoodMapper.class);
job.setReducerClass(GoodsReducer.class);
job.setMapOutputKeyClass(GoodsBeen.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(GoodsBeen.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(GoodGroup.class);
job.setPartitionerClass(GoodPartition.class);
job.setNumReduceTasks(3);
FileInputFormat.setInputPaths(job, new Path("B:/笔记/数据素材/商品表.txt"));
FileOutputFormat.setOutputPath(job, new Path("B:/笔记/数据素材/商品表测试"));
boolean waitForCompletion = job.waitForCompletion(true);
System.out.println(waitForCompletion);
}
}