实验九 MapReduce实验:分布式缓存

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Will_cruise/article/details/88751934

9.1 实验目的

理解序列化与反序列化;熟悉Configuration类;学会使用Configuration类进行参数传递;学会在Map或Reduce阶段引用Configuration传来的参数;理解分布式缓存“加载小表、扫描大表”的处理思想。

9.2 实验要求

假定现有一个大为100G的大表big.txt和一个大小为1M的小表small.txt,请基于MapReduce思想编程实现判断小表中单词在大表中出现次数。也即所谓的“扫描大表、加载小表”。

9.3 实验步骤

为解决上述问题,可开启10个Map、这样,每个Map只需处理总量的1/10,将大大加快处理。而在单独Map内,直接用HashSet加载“1M小表”,对于存在硬盘(Map处理时会将HDFS文件拷贝至本地)的10G大文件,则逐条扫描,这就是所谓的“扫描大表、加载小表”,也即分布式缓存,如图9-1所示。

图9-1 加载小表扫描大表示意图

由于实验中没有100G这样的大表,甚至连1M这样的小表都没有,因为本实验采用模拟方式,用少量数据代表大文件big.txt,更少量数据代表small.txt。整个实验步骤为“准备数据上传数据编写代码执行代码查看结果”这五大步骤。

9.3.1 准备数据

为降低操作难度,此处用少量数据代表大文件big.txt,更少量数据代表小文件small.txt,具体操作如下:

首先,登录master机,确认该机上存在“/root/data/9/big.txt”和“/root/data/9/ small.txt”,如图9-2所示,显然big.txt内容为“aaa~zzz和000~999”,small.txt为其中三项。

图9-2 确认本地文件big.txt和small.txt

9.3.2 上传数据

首先,登录master机,查看HDFS里是否已存在目录“/user/root/mr/in”,若不存在,使用下述命令新建该目录。

[root@master ~]# /usr/cstor/hadoop/bin/hdfs  dfs  -mkdir  -p  /user/root/mr/in

接着,使用下述命令将master机本地文件“/root/data/9/big.txt”和“/root/data/9/small.txt”上传至HDFS的“/user/root/mr/in”目录:

[root@master ~]# /usr/cstor/hadoop/bin/hdfs  dfs  -put  /root/data/9/big.txt  /user/root/mr/in
[root@master ~]# /usr/cstor/hadoop/bin/hdfs  dfs  -put  /root/data/9/small.txt  /user/root/mr/in

最后,使用下述命令确认HDFS上文件与内容,如图9-3所示。

图9-3 确认HDFS文件big.txt和small.txt

9.3.3 编写代码

首先,打开Eclipse,依次点击“FileNewOther…Map/Reduce Project”,在弹出的“New MapReduce Project Wizard”对话框中,“Project name:”一栏填写项目名“BigSmallTable”,然后直接点击该对话框的“Finish”按钮,如图9-4所示。

图9-4 确认文件big.txt和small.txt

接着,新建BigAndSmallTable类并指定包名(代码中为cn.cstor.mr),在BigAndSmallTable.java文件中,依次写入如下代码:

package cn.cstor.mr;

import java.io.IOException;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.LineReader;

public class BigAndSmallTable {
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private static HashSet<String> smallTable = null;

protected void setup(Context context) throws IOException,
InterruptedException {
smallTable = new HashSet<String>();
Path smallTablePath = new Path(context.getConfiguration().get(
"smallTableLocation"));
FileSystem hdfs = smallTablePath.getFileSystem(context
.getConfiguration());
FSDataInputStream hdfsReader = hdfs.open(smallTablePath);
Text line = new Text();
LineReader lineReader = new LineReader(hdfsReader);
while (lineReader.readLine(line) > 0) {
// you can do something here
String[] values = line.toString().split(" ");
for (int i = 0; i < values.length; i++) {
smallTable.add(values[i]);
System.out.println(values[i]);
}
}
lineReader.close();
hdfsReader.close();
System.out.println("setup ok *^_^* ");
}

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] values = value.toString().split(" ");
for (int i = 0; i < values.length; i++) {
if (smallTable.contains(values[i])) {
context.write(new Text(values[i]), one);
}
}
}
}

public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("smallTableLocation", args[1]);
Job job = Job.getInstance(conf, "BigAndSmallTable");
job.setJarByClass(BigAndSmallTable.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

图9-5位本项目结构图,请读者对照该图,分析项目结构图中各模块。

图9-5 项目开发过程示例

待代码编写结束,选中该项目,依次点击“ExportJavaJAR file”,弹出对话框如图9-6中填写打包位置,接着Finish即可。笔者此处打包时包名及其位置为“C:\Users\allen\Desktop\BigSmallTable.jar”。

图9-6 项目开发过程示例

9.3.4 执行代码

首先,使用“Xmanager Enterprise 5”将“C:\Users\allen\ Desktop\BigSmallTable.jar”上传至master机。此处上传至“/root/BigSmallTable.jar”

接着,登录master机上,使用下述命令提交BigSmallTable.jar任务。

[root@master ~]# /usr/cstor/hadoop/bin/hadoop  jar  /root/BigSmallTable.jar \
cn.cstor.mr.BigAndSmallTable  /user/root/mr/in/big.txt \
/user/root/mr/in/small.txt  /user/root/mr/bigAndSmallResult

9.3.5 查看结果

程序执行后,可使用下述命令查看执行结果,注意若再次执行,请更改结果目录:

[root@master ~]# /usr/cstor/hadoop/bin/hdfs  dfs  -cat  /user/root/mr/bigAndSmallResult/part-r-00000

9.4 实验结果

实验结果如图9-7所示,根据big.txt,small.txt文件内容和编程目的,易知实验结果准确无误。

图9-7 查看实验结果

实验操作:

步骤一:搭建Hadoop集群

步骤2:上传数据文件至HDFS

步骤3:编写分布式缓存程序

步骤4:打包程序

步骤5:运行程序

步骤6:查看运行结果

猜你喜欢

转载自blog.csdn.net/Will_cruise/article/details/88751934