问题描述:数据结构<任务id,资源类型,发布日期,词,频度>
已经按 任务id,资源类型,发布日期,词 汇总了频度信息,现在需要以<任务id,资源类型,发布日期>为分组,组内按频度倒排,提取前200条记录
参考hadoop自带示例中的org.apache.hadoop.examples.SecondarySort实现
复合key: WordFreq<TagHead,词,频度>,其中TagHead表达group,即<任务id,资源类型,发布日期>
1.在WordFreq中通过Override compareTo实现组内按频度倒排
@Override
public int compareTo(WordFreq other) {
return ComparisonChain.start()
.compare(this.getGroup(), other.getGroup())
.compare(other.count, this.count)
.compare(this.tag.getWord(), other.tag.getWord()).result();
}
2.在TagHead中Override如下3项
@Override
public int compareTo(TagHead other) {
return ComparisonChain.start().compare(this.tagsid, other.tagsid)
.compare(this.sourceType, other.sourceType)
.compare(this.releaseDateDay, other.releaseDateDay).result();
}
@Override
public boolean equals(Object o) {
if (o instanceof TagHead) {
TagHead other = (TagHead) o;
return this.tagsid.equals(other.tagsid)
&& this.sourceType.equals(other.sourceType)
&& this.releaseDateDay.equals(other.releaseDateDay);
}
return false;
}
@Override
public int hashCode() {
int hash = (this.tagsid != null ? Integer.parseInt(this.tagsid) : 0);
hash += (this.sourceType != null ? Integer.parseInt(this.sourceType) * 13
: 0);
hash += (this.releaseDateDay != null ? Integer
.parseInt(this.releaseDateDay.replace("-", "")) * 7 : 0);
return hash;
}
3.简单项:SimpleWordFreq<词,频度>
4.
public class SubSortingWordFreqMapper extends
Mapper<LongWritable, Text, WordFreq, SimpleWordFreq>{
...
}
public static class SubSortingWordFreqReducer extends
Reducer<WordFreq, SimpleWordFreq, Text, NullWritable> {
@Override
protected void reduce(WordFreq key, Iterable<SimpleWordFreq> values,
Context context) throws IOException, InterruptedException {
for (SimpleWordFreq value : values) {
...
}
}
}
5.自定义Partitioner,计算nature key即group的哈希值
public class TagCloudPartitioner extends Partitioner<WordFreq, SimpleWordFreq> {
private static Logger log = LoggerFactory
.getLogger(TagCloudPartitioner.class);
@Override
public int getPartition(WordFreq key, SimpleWordFreq value, int numPartitions) {
int hashCode = key.getGroup().hashCode();
log.debug(key.getGroup().getHead("_") + ";hashCode=" + hashCode);
return hashCode % numPartitions;
}
}
6.自定义groupComparator
public class TagCloudHeadGroupingComparator extends WritableComparator {
protected TagCloudHeadGroupingComparator() {
super(WordFreq.class, true);
}
@Override
public int compare(WritableComparable tp1, WritableComparable tp2) {
WordFreq wordFreq = (WordFreq) tp1;
WordFreq wordFreq2 = (WordFreq) tp2;
return wordFreq.compareGroup(wordFreq2);
}
}
7.调用时特殊设置
job.setPartitionerClass(TagCloudPartitioner.class);
job.setGroupingComparatorClass(TagCloudHeadGroupingComparator.class);