微博内容(如图):ID content
公式:
TF:词条在某个微博中出现的词频(出现次数).
N:微博总数
DF:词条在多少个微博中出现过
案例用到四个reduceTask,下标计数从0开始,三个统计词频TF,一个统计微博总数N。
FirstMapper.java
对输入文件的每行记录微博内容进行分词,统计微博词频TF及微博总数,每个词条输出词频数1;每个微博输出一个count=1
package com.jeff.mr.tf;
import java.io.IOException;
import java.io.StringReader;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
/**
* TF:词条在某个微博中出现的词频(出现次数).
N:微博总数
DF:词条在多少个微博中出现过
--------------------------------
* 第一个MR,计算TF和计算N(微博总数)
* @author root
*
*/
public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//value是微博文件每一行以制表符\t隔开
String[] v =value.toString().trim().split("\t");
if(v.length>=2){
String id=v[0].trim();
String content =v[1].trim();
//对微博内容进行中文分词处理
StringReader sr =new StringReader(content);
IKSegmenter ikSegmenter =new IKSegmenter(sr, true);
Lexeme word=null;
while( (word=ikSegmenter.next()) !=null ){
String w= word.getLexemeText();//w就是微博内容的每一个词汇
//输出格式为:key为:词汇_微博ID value是1,出现次数
context.write(new Text(w+"_"+id), new IntWritable(1));
}
//每执行一次这个方法,就表示统计了一条微博数,将来在第四个reduce分区执行,参见FirstPartition,自定义分区规则
context.write(new Text("count"), new IntWritable(1));
}else{
System.out.println(value.toString()+"-------------");
}
}
}
FirstPartition.java
自定义分区,使得key为count的分区到最后一个分区(编号3),其他的分别分区编号为0/1/2三个reduceTask
package com.jeff.mr.tf;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
* 第一个MR自定义分区,把key为count的,即用来计算微博总数的数据分区到第四个reduce分区,
* 前三个reduce分区用来计算TF,就是单个微博中词汇出现次数
* @author root
*
*/
public class FirstPartition extends HashPartitioner<Text, IntWritable>{
public int getPartition(Text key, IntWritable value, int reduceCount) {
if(key.equals(new Text("count")))
return 3;
else
return super.getPartition(key, value, reduceCount-1);
}
}
FirstReduce.java
计算单个词条的词频TF,输入数据为FirstMapper.java的输出,key为词条_id.或者count,值为词频个数或者count个数,当key为count时不参与计算只输出查看。
输出格式:词条_ID 词频
package com.jeff.mr.tf;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* c1_001,2
* c2_001,1
* count,10000
* @author root
*
*/
public class FirstReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
protected void reduce(Text arg0, Iterable<IntWritable> arg1,
Context arg2)
throws IOException, InterruptedException {
int sum =0;
for( IntWritable i :arg1 ){
sum= sum+i.get();
}
if(arg0.equals(new Text("count"))){
System.out.println(arg0.toString() +"___________"+sum);
}
arg2.write(arg0, new IntWritable(sum));
}
}
在dfs-location上新建路径:/usr/input/tf-idf并上传文件微博内容:
接下来就可以执行FirstJob.java来执行第一个MR:
package com.jeff.mr.tf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FirstJob {
public static void main(String[] args) {
Configuration config =new Configuration();
config.set("fs.defaultFS", "hdfs://node4:8020");
config.set("yarn.resourcemanager.hostname", "node4");
try {
FileSystem fs =FileSystem.get(config);
// JobConf job =new JobConf(config);
Job job =Job.getInstance(config);
job.setJarByClass(FirstJob.class);
job.setJobName("weibo1");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// job.setMapperClass();
job.setNumReduceTasks(4);
job.setPartitionerClass(FirstPartition.class);
job.setMapperClass(FirstMapper.class);
job.setCombinerClass(FirstReduce.class);
job.setReducerClass(FirstReduce.class);
FileInputFormat.addInputPath(job, new Path("/usr/input/tf-idf"));
Path path =new Path("/usr/output/weibo1");
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job,path);
boolean f= job.waitForCompletion(true);
if(f){
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行成功:
刷新DFS-Location,看到在/usr/output/weibo1的目录下生成了四个分区文件,每一个分区文件都是四个reduceTask的输出文件
其中第四个分区文件就是用来计算Count微博总数N的,其他三个都是微博中词汇即出现次数。
比如:0.03元_3824213951437432 1
这个就表示0.03元这个词在ID为3824213951437432微博中出现了1次
TwoMapper.java
统计DF,词条在多少个微博中出现过
输出格式:词条 出现的微博个数
package com.jeff.mr.tf;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
//统计df:词在多少个微博中出现过。
public class TwoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/**
* 1 获取当前 mapper Task的数据片段(split)
* 2 当前mapper Task的数据来源于第一个MR输出的四个文件
*/
FileSplit fs = (FileSplit) context.getInputSplit();
//可以从fs获取第一个MR的文件名,除了最后一个文件是用来计算微博总数的,其他都是TF
if (!fs.getPath().getName().contains("part-r-00003")) {
String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
//获取{0.03元_3824213951437432 1},这种第一个MR的输出数据,即每一行
String[] ss = v[0].split("_");
if (ss.length >= 2) {
String w = ss[0];//得到每一个词汇,输出次数1,此处所有微博的词汇都会输出1次
context.write(new Text(w), new IntWritable(1));
}
} else {
System.out.println(value.toString() + "-------------");
}
}
}
}
TwoReduce.java
package com.jeff.mr.tf;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 计算词汇在所有微博中出现的次数
* @author jeffSheng
* 2018年10月17日
*/
public class TwoReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* 输入数据:
* key:0.03元 value:1(次)
* Iterable<IntWritable> arg1,即key相等的一组数据
*/
protected void reduce(Text key, Iterable<IntWritable> arg1,Context context)
throws IOException, InterruptedException {
int sum =0;
for( IntWritable i :arg1 ){
sum= sum + i.get();
}
context.write(key, new IntWritable(sum));
}
}
执行TwoJob.java第二个MR,计算每个词汇在所有微博出现次数即DF
package com.jeff.mr.tf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TwoJob {
public static void main(String[] args) {
Configuration config =new Configuration();
config.set("fs.defaultFS", "hdfs://node4:8020");
config.set("yarn.resourcemanager.hostname", "node4");
try {
// JobConf job =new JobConf(config);
Job job =Job.getInstance(config);
job.setJarByClass(TwoJob.class);
job.setJobName("weibo2");
//设置map任务的输出key类型、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// job.setMapperClass();
job.setMapperClass(TwoMapper.class);
job.setCombinerClass(TwoReduce.class);
job.setReducerClass(TwoReduce.class);
//mr运行时的输入数据从hdfs的哪个目录中获取
FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
FileOutputFormat.setOutputPath(job, new Path("/usr/output/weibo2"));
boolean f= job.waitForCompletion(true);
if(f){
System.out.println("执行job成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
刷新DFS-Location看到/usr/output/weibo2下的DF输出文件:
比如0.03元 在所有微博中出现了1次
根据公式计算微博词汇权重:
LastMapper.java
输入数据为所有词的TF,所有词的DF,微博总数N,根据这三个变量计算词条最终权重。
输出格式:微博ID 词条:权重
package com.jeff.mr.tf;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* 最后计算
* @author root
*
*/
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
//存放微博总数
public static Map<String, Integer> cmap = null;
//存放df
public static Map<String, Integer> df = null;
// 在map方法执行之前,即mapperTask初始化的时候执行
/**
* mapReduce的执行过程回顾:
* 比如一个文件被分割成1024个碎片段,则一定有与之对应的1024个mapTask去执行每个碎片段。
* mapTask在有碎片段的节点上执行,即 dataNode上有碎片段,在dataNode上执行。所以每个DataNode上就
* 有一个NodeManager来执行mapReduce程序,NodeManager里面有一个与之对应的ApplicationMatser
* 负责从resourceManager中请求资源即Contianer中文是容器,其实是资源。申请资源后,ApplicationMatser
* 则可以通过一个Executor对象执行mapperTask,并监控和记录执行状态、进度等数据汇报给NodeManager,NodeManager
* 再汇报给resourceManager。
* Executor对象执行mapperTask的时候先初始化对应的MapTask,其实就是我们的LastMapper.
* java自定义的xxxMapper,只要初始化成功就调用LastMapper的setUp方法,这个时候map方法还没执行,
* map方法是循环调用的,即每一行都调用一次,但是setUp方法只会调用一次。不过1024个碎片段对应1024个mapTask,
* 就会执行setup方法1024次,还是狠多次,所以我们可以考虑从共享内存中取得一部分数据,比如微博总数N和DF记录。
* 我们使用cmap和df两个Map来存放,判断是否为空,即保证存过就不用再存了。
*
*
*/
protected void setup(Context context) throws IOException,
InterruptedException {
System.out.println("******************");
if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
URI[] ss = context.getCacheFiles();
if (ss != null) {
for (int i = 0; i < ss.length; i++) {
URI uri = ss[i];
if (uri.getPath().endsWith("part-r-00003")) {//微博总数
Path path =new Path(uri.getPath());
// FileSystem fs =FileSystem.get(context.getConfiguration());
// fs.open(path);
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line = br.readLine();
if (line.startsWith("count")) {
String[] ls = line.split("\t");
cmap = new HashMap<String, Integer>();
cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
} else if (uri.getPath().endsWith("part-r-00000")) {//词条的DF
df = new HashMap<String, Integer>();
Path path =new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line;
while ((line = br.readLine()) != null) {
String[] ls = line.split("\t");
df.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
}
}
}
}
}
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
// System.out.println("--------------------");
if (!fs.getPath().getName().contains("part-r-00003")) {
String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
int tf =Integer.parseInt(v[1].trim());//tf值
String[] ss = v[0].split("_");
if (ss.length >= 2) {
String w = ss[0];
String id=ss[1];
//根据公式计算权重,输出:微博Id 词汇1:权重1 词汇2:权重2
double s=tf * Math.log(cmap.get("count")/df.get(w));
NumberFormat nf =NumberFormat.getInstance();
nf.setMaximumFractionDigits(5);
context.write(new Text(id), new Text(w+":"+nf.format(s)));
}
} else {
System.out.println(value.toString() + "-------------");
}
}
}
}
LastReduce.java
计算所有词条的最终权重,相同微博在后边显示其所有的词条:权重,并使用制表符\t隔开。
输出格式:微博ID 词条:权重 词条:权重
package com.jeff.mr.tf;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LastReduce extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text> arg1,
Context context)
throws IOException, InterruptedException {
StringBuffer sb =new StringBuffer();
for( Text i :arg1 ){
sb.append(i.toString()+"\t");
}
context.write(key, new Text(sb.toString()));
}
}
执行LastJob计算最终输出结果:
我们这里采用的是在本地提交到Linux环境下进行执行测试的
package com.jeff.mr.tf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LastJob {
public static void main(String[] args) {
Configuration config =new Configuration();
// config.set("fs.defaultFS", "hdfs://node1:8020");
// config.set("yarn.resourcemanager.hostname", "node1");
config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");
try {
FileSystem fs =FileSystem.get(config);
// JobConf job =new JobConf(config);
Job job =Job.getInstance(config);
job.setJarByClass(LastJob.class);
job.setJobName("weibo3");
// DistributedCache.addCacheFile(uri, conf);
//2.5
/**
* 之所以以下两行可以加载到内存因为微博总数的文件和df文件其实都不大,所有可以在任务启动之初先加载到内存
*/
//把微博总数N加载到内存
job.addCacheFile(new Path("/usr/output/weibo1/part-r-00003").toUri());
//把df加载到内存
job.addCacheFile(new Path("/usr/output/weibo2/part-r-00000").toUri());
//设置map任务的输出key类型、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setMapperClass();
job.setMapperClass(LastMapper.class);
job.setReducerClass(LastReduce.class);
//mr运行时的输入数据从hdfs的哪个目录中获取
FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
Path outpath =new Path("/usr/output/weibo3");
if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job,outpath );
boolean f= job.waitForCompletion(true);
if(f){
System.out.println("执行job成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
需要做的是将工程打包放在桌面weibo3.jar,然后在LastJob中添加:
config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");
配置文件放在src下:
开始执行:
观察刚开始执行
观察执行完成;
刷新DFS-Location
比如:3823890239358658 继续:4.89035 支持:3.04452
表示在微博ID为3823890239358658微博中,[继续]的全部微博中权重为4.89035,[支持]的全部微博中权重为3.04452
有了这些结果,我们就可以做出一些商业或者其他领域的重要选择!
当然也可以在本地进行测试,就是在LastMapper的setUp中注释掉的代码:
FileSystem fs =FileSystem.get(context.getConfiguration());
FSDataInputStream fsdInputStream = fs.open(path);
将输入流封装进BufferedReader即可。