前面学习了简单的经典的Wordcount算法,今天我们来学习一个处理文件内容、整理数据集的算法(关系的自然连接);
用MapReduce实现关系的自然连接
- 假设有关系R(A,B)和S(B,C),对二者进行自然连接操作
- 使用Map过程,把来自R的每个元组
<a,b>
转换成一个键值对<b, <R,a>>
,其中的键就是属性B的值。把关系R包含到值中,这样做使得我们可以在Reduce阶段,只把那些来自R的元组和来自S的元组进行匹配。类似地,使用Map过程,把来自S的每个元组<b,c>
,转换成一个键值对<b,<S,c>>
- 所有具有相同B值的元组被发送到同一个Reduce进程中,Reduce进程的任务是,把来自关系R和S的、具有相同属性B值的元组进行合并
- Reduce进程的输出则是连接后的元组
自然连接过程
应用示例eg:
在HDFS中有两个文件,一个记录了学生的基本信息,包含了姓名和学号信息,名为student_info.txt,内容为:
Jenny 00001
Hardy 00002
Bradley 00003
还有一个文件记录了学生的选课信息表,包括了学号和课程名,名为student_class_info.txt,内容为:
00001 Chinese
00001 Math
00002 Music
00002 Math
00003 Physic
现在经join操作后,得出的结果为:
Jenny Chinese
Jenny Math
Hardy Music
Hardy Math
Bradley Physic
实战题目:
程序代码
Mapper
package Map;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
import java.util.StringTokenizer;
public class MyMap extends Mapper<LongWritable, Text, Text, Text> {
private static final String FILENAME1 = "file1.txt";//定义校验文件名
private static final String FILENAME2 = "file2.txt";
private static final String FILECLASS1 = "file1";
private static final String FILECLASS2 = "file2";
private FileSplit fs = null;//文件块对象
private Text outkey = new Text();//map阶段的输出key值容器
private Text outvalue = new Text();//map阶段的输出value值容器
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString());//String工具(快捷切分对象)
fs = (FileSplit) context.getInputSplit();//获取文件对象
if (fs.getPath().getName().equals(FILENAME1)) {//判断记录来自哪个文件
/**
* 文件1的数据处理
*/
outkey.set(st.nextToken());
outvalue.set(FILECLASS1 + "," + st.nextToken());
} else if (fs.getPath().getName().equals(FILENAME2)) {//判断记录来自哪个文件
/**
* 文件2的数据处理
*/
outkey.set(st.nextToken());
outvalue.set(FILECLASS2 + "," + st.nextToken() + "----" + st.nextToken());
}
context.write(outkey, outvalue);//传递数据
}
}
Reducer
package Reduce;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.StringTokenizer;
public class MyReduce extends Reducer<Text, Text, Text, Text> {
private static final String FILECLASS1 = "file1";//定义校对文件名
private static final String FILECLASS2 = "file2";
private String fileName = "";//文件名(K)
private String text = "";//文本内容(V)
private ArrayList<String> list = null;//用集合来暂存处理后的数据
private Text outkey = new Text();//reduce阶段的输出key值容器
private Text outvalue = new Text();//reduce阶段的输出value值容器
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
list = new ArrayList<>();
for (Text value : values) {
StringTokenizer st = new StringTokenizer(value.toString(), ",");
fileName = st.nextToken();
text = st.nextToken();
if (FILECLASS1.equals(fileName)) {
outkey.set(text);//文件1中的文本内容便是输出数据中的第一列
} else if (FILECLASS2.equals(fileName)) {
list.add(text);//文件2中的文本内容便是输出数据中的第二列
}
}
//计算笛卡尔积(分拣结果)
for (String out : list) {
outvalue.set(out);
context.write(outkey, outvalue);//输出结果数据
}
}
}
Runner
package Demo;
import Map.MyMap;
import Reduce.MyReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
public class JTest {
/**
* 全局变量
*/
static FileSystem fs = null;
static Job job = null;
static Configuration conf = new Configuration();
static String uri = "hdfs://192.168.132.130:9000";
static {//用静态代码块初始化
conf.setBoolean("dfs.support.append", true);
try {
job = Job.getInstance(conf);
fs = FileSystem.get(URI.create(uri), conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
job.setJobName("exam");//设置任务名称
job.setMapperClass(MyMap.class);//指定map类
job.setReducerClass(MyReduce.class);//指定reduce类
job.setMapOutputKeyClass(Text.class);//设置map阶段的key输出类型
job.setMapOutputValueClass(Text.class);//设置map阶段的value输出类型
job.setOutputKeyClass(Text.class);//设置reduce(最终阶段)的key输出类型
job.setOutputValueClass(Text.class);//设置reduce(最终阶段)的value输出类型
// job.setPartitionerClass(MyPartition.class);//分区
// job.setNumReduceTasks(3);//分几个区
checkFileExists(new Path(uri.concat("/output")));//调用检测文件是否存在的方法
FileInputFormat.setInputPaths(job,new Path(uri.concat("/input/")));
FileOutputFormat.setOutputPath(job,new Path(uri.concat("/output")));
job.waitForCompletion(true);
show();
fs.close();//关闭资源
}
static void checkFileExists(Path... paths) throws IOException {
/**
* 检测文件是否存在的方法。
* 避免报文件已存在的异常。
*/
for (Path path : paths) {
boolean exists = fs.exists(path);
if (exists) {
fs.delete(path,true);
}
}
}
synchronized static void show() throws IOException {
/**
* 结果展示的方法。
*/
FSDataInputStream open = fs.open(new Path(uri.concat("/output/part-r-00000")));
BufferedReader reader = new BufferedReader(new InputStreamReader(open, "utf-8"));
String read = "";
System.out.println("____________________________________分割线______________________________________");
while((read = reader.readLine())!=null){
System.out.println(read);
}
reader.close();
open.close();
}
}
运行结果
Kitty math----80
Kitty computer----56
Kitty english----78
Mosy english----69
Mosy math----76
Mosy computer----77
Jony english----88
Jony computer----84
Jony math----90
Goston math----67
Goston computer----92
Goston english----98
Tom english----56
Tom math----78
Tom computer----55
参考-->作者:CloudsStyle 原文:https://blog.csdn.net/u011026329/article/details/67632343