案例5-挖掘微博广告高权重词条

版权声明:原创文章,未经允许不得转载.Tips:传统电商火热的时代已经成为过去 , 下一个阶段属于大数据 人工智能 , 服务、便捷、安全、效率、创新成为下一个阶段互联网时代的新词汇,而IT技术也随着行业的变化发展而不断更迭。对于码农的出路总结一句话:追技术不如追领域。[基础][设计][能力] https://blog.csdn.net/shengqianfeng/article/details/83119796

微博内容(如图):ID  content

公式:

TF:词条在某个微博中出现的词频(出现次数).

N:微博总数

DF:词条在多少个微博中出现过

案例用到四个reduceTask,下标计数从0开始,三个统计词频TF,一个统计微博总数N。

 

FirstMapper.java

对输入文件的每行记录微博内容进行分词,统计微博词频TF及微博总数,每个词条输出词频数1;每个微博输出一个count=1

package com.jeff.mr.tf;

import java.io.IOException;
import java.io.StringReader;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

/**
 *  TF:词条在某个微博中出现的词频(出现次数).
	N:微博总数
	DF:词条在多少个微博中出现过
	--------------------------------
 *   第一个MR,计算TF和计算N(微博总数)
 * @author root
 *
 */
public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		//value是微博文件每一行以制表符\t隔开
		String[]  v =value.toString().trim().split("\t");
		if(v.length>=2){
			String id=v[0].trim();
			String content =v[1].trim();
			//对微博内容进行中文分词处理
			StringReader sr =new StringReader(content);
			IKSegmenter ikSegmenter =new IKSegmenter(sr, true);
			Lexeme word=null;
			while( (word=ikSegmenter.next()) !=null ){
				String w= word.getLexemeText();//w就是微博内容的每一个词汇
				//输出格式为:key为:词汇_微博ID    value是1,出现次数
				context.write(new Text(w+"_"+id), new IntWritable(1));
			}
			//每执行一次这个方法,就表示统计了一条微博数,将来在第四个reduce分区执行,参见FirstPartition,自定义分区规则
			context.write(new Text("count"), new IntWritable(1));
		}else{
			System.out.println(value.toString()+"-------------");
		}
	}
	
	
	
}

FirstPartition.java

自定义分区,使得key为count的分区到最后一个分区(编号3),其他的分别分区编号为0/1/2三个reduceTask

package com.jeff.mr.tf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 第一个MR自定义分区,把key为count的,即用来计算微博总数的数据分区到第四个reduce分区,
 * 前三个reduce分区用来计算TF,就是单个微博中词汇出现次数
 * @author root
 *
 */
public class FirstPartition extends HashPartitioner<Text, IntWritable>{

	
	public int getPartition(Text key, IntWritable value, int reduceCount) {
		if(key.equals(new Text("count")))
			return 3;
		else
			return super.getPartition(key, value, reduceCount-1);
	}

}

FirstReduce.java

计算单个词条的词频TF,输入数据为FirstMapper.java的输出,key为词条_id.或者count,值为词频个数或者count个数,当key为count时不参与计算只输出查看。

输出格式:词条_ID 词频

package com.jeff.mr.tf;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * c1_001,2
 * c2_001,1
 * count,10000
 * @author root
 *
 */
public class FirstReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
	
	protected void reduce(Text arg0, Iterable<IntWritable> arg1,
			Context arg2)
			throws IOException, InterruptedException {
		
		int sum =0;
		for( IntWritable i :arg1 ){
			sum= sum+i.get();
		}
		if(arg0.equals(new Text("count"))){
			System.out.println(arg0.toString() +"___________"+sum);
		}
		arg2.write(arg0, new IntWritable(sum));
	}

}

在dfs-location上新建路径:/usr/input/tf-idf并上传文件微博内容:

 

接下来就可以执行FirstJob.java来执行第一个MR:

package com.jeff.mr.tf;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FirstJob {

	public static void main(String[] args) {
		Configuration config =new Configuration();
		config.set("fs.defaultFS", "hdfs://node4:8020");
		config.set("yarn.resourcemanager.hostname", "node4");
		try {
			FileSystem fs =FileSystem.get(config);
//			JobConf job =new JobConf(config);
			Job job =Job.getInstance(config);
			job.setJarByClass(FirstJob.class);
			job.setJobName("weibo1");
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
//			job.setMapperClass();
			job.setNumReduceTasks(4);
			job.setPartitionerClass(FirstPartition.class);
			job.setMapperClass(FirstMapper.class);
			job.setCombinerClass(FirstReduce.class);
			job.setReducerClass(FirstReduce.class);
			
			
			FileInputFormat.addInputPath(job, new Path("/usr/input/tf-idf"));
			
			Path path =new Path("/usr/output/weibo1");
			if(fs.exists(path)){
				fs.delete(path, true);
			}
			FileOutputFormat.setOutputPath(job,path);
			
			boolean f= job.waitForCompletion(true);
			if(f){
				
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

执行成功:

刷新DFS-Location,看到在/usr/output/weibo1的目录下生成了四个分区文件,每一个分区文件都是四个reduceTask的输出文件

其中第四个分区文件就是用来计算Count微博总数N的,其他三个都是微博中词汇即出现次数。

比如:0.03元_3824213951437432       1

这个就表示0.03元这个词在ID为3824213951437432微博中出现了1次

 

 

TwoMapper.java

统计DF,词条在多少个微博中出现过

输出格式:词条 出现的微博个数

package com.jeff.mr.tf;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
//统计df:词在多少个微博中出现过。
public class TwoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {

		/**
		 * 1  获取当前	mapper Task的数据片段(split)
		 * 2 当前mapper Task的数据来源于第一个MR输出的四个文件
		 */
		FileSplit fs = (FileSplit) context.getInputSplit();
		//可以从fs获取第一个MR的文件名,除了最后一个文件是用来计算微博总数的,其他都是TF
		if (!fs.getPath().getName().contains("part-r-00003")) {
			String[] v = value.toString().trim().split("\t");
			if (v.length >= 2) {
				//获取{0.03元_3824213951437432	1},这种第一个MR的输出数据,即每一行
				String[] ss = v[0].split("_");
				if (ss.length >= 2) {
					String w = ss[0];//得到每一个词汇,输出次数1,此处所有微博的词汇都会输出1次
					context.write(new Text(w), new IntWritable(1));
				}
			} else {
				System.out.println(value.toString() + "-------------");
			}
		}

	}
}

TwoReduce.java

package com.jeff.mr.tf;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 计算词汇在所有微博中出现的次数
 * @author jeffSheng
 * 2018年10月17日
 */
public class TwoReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
	/**
	 * 输入数据:
	 *    key:0.03元	  value:1(次)
	 * Iterable<IntWritable> arg1,即key相等的一组数据
	 */
	protected void reduce(Text key, Iterable<IntWritable> arg1,Context context)
												throws IOException, InterruptedException {
		int sum =0;
		for( IntWritable i :arg1 ){
			sum= sum + i.get();
		}
		context.write(key, new IntWritable(sum));
	}

}

执行TwoJob.java第二个MR,计算每个词汇在所有微博出现次数即DF

package com.jeff.mr.tf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class TwoJob {

	public static void main(String[] args) {
		Configuration config =new Configuration();
		config.set("fs.defaultFS", "hdfs://node4:8020");
		config.set("yarn.resourcemanager.hostname", "node4");
		try {
//			JobConf job =new JobConf(config);
			Job job =Job.getInstance(config);
			job.setJarByClass(TwoJob.class);
			job.setJobName("weibo2");
			//设置map任务的输出key类型、value类型
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
//			job.setMapperClass();
			job.setMapperClass(TwoMapper.class);
			job.setCombinerClass(TwoReduce.class);
			job.setReducerClass(TwoReduce.class);
			
			//mr运行时的输入数据从hdfs的哪个目录中获取
			FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
			FileOutputFormat.setOutputPath(job, new Path("/usr/output/weibo2"));
			
			boolean f= job.waitForCompletion(true);
			if(f){
				System.out.println("执行job成功");
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

刷新DFS-Location看到/usr/output/weibo2下的DF输出文件:

比如0.03元 在所有微博中出现了1次

 

根据公式计算微博词汇权重:

LastMapper.java

输入数据为所有词的TF,所有词的DF,微博总数N,根据这三个变量计算词条最终权重。

输出格式:微博ID 词条:权重

package com.jeff.mr.tf;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * 最后计算
 * @author root
 *
 */
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
	//存放微博总数
	public static Map<String, Integer> cmap = null;
	//存放df
	public static Map<String, Integer> df = null;

	// 在map方法执行之前,即mapperTask初始化的时候执行
	/**
	 * mapReduce的执行过程回顾:
	 * 比如一个文件被分割成1024个碎片段,则一定有与之对应的1024个mapTask去执行每个碎片段。
	 * mapTask在有碎片段的节点上执行,即 dataNode上有碎片段,在dataNode上执行。所以每个DataNode上就
	 * 有一个NodeManager来执行mapReduce程序,NodeManager里面有一个与之对应的ApplicationMatser
	 * 负责从resourceManager中请求资源即Contianer中文是容器,其实是资源。申请资源后,ApplicationMatser
	 * 则可以通过一个Executor对象执行mapperTask,并监控和记录执行状态、进度等数据汇报给NodeManager,NodeManager
	 * 再汇报给resourceManager。
	 * Executor对象执行mapperTask的时候先初始化对应的MapTask,其实就是我们的LastMapper.
	 * java自定义的xxxMapper,只要初始化成功就调用LastMapper的setUp方法,这个时候map方法还没执行,
	 * map方法是循环调用的,即每一行都调用一次,但是setUp方法只会调用一次。不过1024个碎片段对应1024个mapTask,
	 * 就会执行setup方法1024次,还是狠多次,所以我们可以考虑从共享内存中取得一部分数据,比如微博总数N和DF记录。
	 * 我们使用cmap和df两个Map来存放,判断是否为空,即保证存过就不用再存了。
	 * 
	 * 
	 */
	protected void setup(Context context) throws IOException,
			InterruptedException {
		System.out.println("******************");
		if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
			URI[] ss = context.getCacheFiles();
			if (ss != null) {
				for (int i = 0; i < ss.length; i++) {
					URI uri = ss[i];
					if (uri.getPath().endsWith("part-r-00003")) {//微博总数
						Path path =new Path(uri.getPath());
//						FileSystem fs =FileSystem.get(context.getConfiguration());
//						fs.open(path);
						BufferedReader br = new BufferedReader(new FileReader(path.getName()));
						String line = br.readLine();
						if (line.startsWith("count")) {
							String[] ls = line.split("\t");
							cmap = new HashMap<String, Integer>();
							cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
						}
						br.close();
					} else if (uri.getPath().endsWith("part-r-00000")) {//词条的DF
						df = new HashMap<String, Integer>();
						Path path =new Path(uri.getPath());
						BufferedReader br = new BufferedReader(new FileReader(path.getName()));
						String line;
						while ((line = br.readLine()) != null) {
							String[] ls = line.split("\t");
							df.put(ls[0], Integer.parseInt(ls[1].trim()));
						}
						br.close();
					}
				}
			}
		}
	}

	
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		FileSplit fs = (FileSplit) context.getInputSplit();
//		System.out.println("--------------------");
		if (!fs.getPath().getName().contains("part-r-00003")) {
			String[] v = value.toString().trim().split("\t");
			if (v.length >= 2) {
				int tf =Integer.parseInt(v[1].trim());//tf值
				String[] ss = v[0].split("_");
				if (ss.length >= 2) {
					String w = ss[0];
					String id=ss[1];
					//根据公式计算权重,输出:微博Id  词汇1:权重1 词汇2:权重2  
					double s=tf * Math.log(cmap.get("count")/df.get(w));
					NumberFormat nf =NumberFormat.getInstance();
					nf.setMaximumFractionDigits(5);
					context.write(new Text(id), new Text(w+":"+nf.format(s)));
				}
			} else {
				System.out.println(value.toString() + "-------------");
			}
		}
	}
}

LastReduce.java

计算所有词条的最终权重,相同微博在后边显示其所有的词条:权重,并使用制表符\t隔开。

输出格式:微博ID  词条:权重  词条:权重

package com.jeff.mr.tf;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class LastReduce extends Reducer<Text, Text, Text, Text>{
	
	protected void reduce(Text key, Iterable<Text> arg1,
			Context context)
			throws IOException, InterruptedException {
		
		StringBuffer sb =new StringBuffer();
		
		for( Text i :arg1 ){
			sb.append(i.toString()+"\t");
		}
		
		context.write(key, new Text(sb.toString()));
	}

}

 

执行LastJob计算最终输出结果:

我们这里采用的是在本地提交到Linux环境下进行执行测试的

package com.jeff.mr.tf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class LastJob {

	public static void main(String[] args) {
		Configuration config =new Configuration();
//		config.set("fs.defaultFS", "hdfs://node1:8020");
//		config.set("yarn.resourcemanager.hostname", "node1");
		config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");
		try {
			FileSystem fs =FileSystem.get(config);
//			JobConf job =new JobConf(config);
			Job job =Job.getInstance(config);
			job.setJarByClass(LastJob.class);
			job.setJobName("weibo3");
			
//			DistributedCache.addCacheFile(uri, conf);
			//2.5
			/**
			 * 之所以以下两行可以加载到内存因为微博总数的文件和df文件其实都不大,所有可以在任务启动之初先加载到内存
			 */
			//把微博总数N加载到内存
			job.addCacheFile(new Path("/usr/output/weibo1/part-r-00003").toUri());
			//把df加载到内存
			job.addCacheFile(new Path("/usr/output/weibo2/part-r-00000").toUri());
			
			//设置map任务的输出key类型、value类型
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
//			job.setMapperClass();
			job.setMapperClass(LastMapper.class);
			job.setReducerClass(LastReduce.class);
			
			//mr运行时的输入数据从hdfs的哪个目录中获取
			FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
			Path outpath =new Path("/usr/output/weibo3");
			if(fs.exists(outpath)){
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job,outpath );
			
			boolean f= job.waitForCompletion(true);
			if(f){
				System.out.println("执行job成功");
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

需要做的是将工程打包放在桌面weibo3.jar,然后在LastJob中添加:

config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");

配置文件放在src下:

开始执行:

打开:http://node1:18088/cluster

观察刚开始执行

观察执行完成;

刷新DFS-Location

比如:3823890239358658     继续:4.89035  支持:3.04452 

表示在微博ID为3823890239358658微博中,[继续]的全部微博中权重为4.89035,[支持]的全部微博中权重为3.04452

有了这些结果,我们就可以做出一些商业或者其他领域的重要选择!

 

当然也可以在本地进行测试,就是在LastMapper的setUp中注释掉的代码:

FileSystem fs =FileSystem.get(context.getConfiguration());

FSDataInputStream fsdInputStream = fs.open(path);

将输入流封装进BufferedReader即可。

 

 

 

 

 

 

 

 

猜你喜欢

转载自blog.csdn.net/shengqianfeng/article/details/83119796