第7章 MapReduce进阶
7.4 MapReduce 连接
连接操作,也就是常说的join操作,是数据分析时经常用到的操作。
比如有两份数据data1和data2,进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。如果数据量比较大,在内存进行连接操会发生内存溢出。MapReduce join就是用来解决大数据的连接问题。
7.4.1 准备数据
这里准备了Oracle数据库中的经典数据。
dept.txt文件存放部门数据。
[root@node1 data]# cat dept.txt
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
[root@node1 data]#
emp.txt文件存放雇员数据。
[root@node1 data]# cat emp.txt
7369,SMITH,CLERK,7902,17-12-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2-81,1250,500,30
7566,JONES,MANAGER,7839,02-4-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5-81,2850,,30
7782,CLARK,MANAGER,7839,09-6-81,2450,,10
7839,KING,PRESIDENT,,17-11-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9-81,1500,0,30
7900,JAMES,CLERK,7698,03-12-81,950,,30
7902,FORD,ANALYST,7566,03-12-81,3000,,20
7934,MILLER,CLERK,7782,23-1-82,1300,,10
上传到HDFS
hdfs dfs -mkdir -p input
hdfs dfs -put emp.txt input
hdfs dfs -put dept.txt input
[root@node1 data]# hdfs dfs -mkdir -p input
[root@node1 data]# hdfs dfs -put emp.txt input
[root@node1 data]# hdfs dfs -put dept.txt input
[root@node1 data]# hdfs dfs -ls input
Found 2 items
-rw-r--r-- 3 root hbase 82 2017-06-23 11:04 input/dept.txt
-rw-r--r-- 3 root hbase 513 2017-06-23 11:04 input/emp.txt
[root@node1 data]#
7.4.2 问题描述
求解每个雇员所在部门,输出格式:雇员名,部门名
比如
RESEARCH,SMITH
SALES,ALLEN
7.4.3 编程
这个问题与SQL中的连接操作类似,将问题转换未1:N问题。
一个部门有多个雇员,一个雇员在唯一的部门。转换为1:N问题,部门是1端,雇员是多段
具体思路是,在map阶段读入emp.txt和dept.txt文件,将join的字段作为map输出key,再将每条记录标记上文件名作为map输出value;在reduce阶段做笛卡尔积。
(1)定义Mapper类
package cn.hadron.mr.join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//当前读取文件的路径
String filePath=((FileSplit)context.getInputSplit()).getPath().toString();
String joinKey="";
String joinValue="";
String fileFlag="";
String[] array=value.toString().split(",");
//判定当前行数据来自哪个文件
if(filePath.contains("dept.txt")){
fileFlag="l";//left
joinKey=array[0];//部门编号
joinValue=array[1];//部门名
}else if(filePath.contains("emp.txt")){
fileFlag="r";//right
joinKey=array[array.length-1];//部门编号
joinValue=array[1];//雇员名
}
//输出键值对,并标记来源文件
context.write(new Text(joinKey),new Text(joinValue+","+fileFlag));
}
}
(2)定义Reducer类
package cn.hadron.mr.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class JoinReducer extends Reducer<Text, Text, Text, Text>{
//相同部门的数据,发送到同一个reduce
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
Iterator<Text> it=values.iterator();
String deptName="";
List<String> empNames=new ArrayList<>();
while(it.hasNext()){
//取一行记录
String[] array=it.next().toString().split(",");
//判定当前记录来源于哪个文件,并根据文件格式解析记录获取相应的信息
if("l".equals(array[1])){//只有1条记录的flag=l
deptName=array[0];
}else if("r".equals(array[1])){
empNames.add(array[0]);
}
}
//求解笛卡尔积,对每个dept的1条记录与emp中多条记录作一次迭代
for(String en:empNames){
context.write(new Text(deptName), new Text(en));
}
}
}
(3)主方法
package cn.hadron.mr.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
public static void main(String[] args) {
// 设置环境变量HADOOP_USER_NAME,其值是root
System.setProperty("HADOOP_USER_NAME", "root");
// Configuration类包含了Hadoop的配置
Configuration config = new Configuration();
// 设置fs.defaultFS
config.set("fs.defaultFS", "hdfs://192.168.80.131:8020");
// 设置yarn.resourcemanager节点
config.set("yarn.resourcemanager.hostname", "node1");
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJarByClass(RunJob.class);
job.setJobName("JoinDemo");
// 设置Mapper和Reducer类
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
// 设置reduce方法输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定输入输出路径
FileInputFormat.addInputPath(job, new Path("/user/root/input/"));
Path outpath = new Path("/user/root/output/");
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
// 提交任务,等待执行完成
boolean f = job.waitForCompletion(true);
if (f) {
System.out.println("job任务执行成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.4.4 运行
run as –> Java Application
Eclipse控制台输出信息:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
job任务执行成功
查看结果
hdfs dfs -ls output
[root@node1 ~]# hdfs dfs -ls output
Found 2 items
-rw-r--r-- 3 root hbase 0 2017-06-23 14:05 output/_SUCCESS
-rw-r--r-- 3 root hbase 168 2017-06-23 14:05 output/part-r-00000
[root@hds117 ~]# hdfs dfs -cat output/part-r-00000
ACCOUNTING MILLER
ACCOUNTING KING
ACCOUNTING CLARK
RESEARCH FORD
RESEARCH JONES
RESEARCH SMITH
SALES JAMES
SALES TURNER
SALES BLAKE
SALES MARTIN
SALES WARD
SALES ALLEN
第7章 MapReduce进阶
7.4 MapReduce 连接
连接操作,也就是常说的join操作,是数据分析时经常用到的操作。
比如有两份数据data1和data2,进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。如果数据量比较大,在内存进行连接操会发生内存溢出。MapReduce join就是用来解决大数据的连接问题。
7.4.1 准备数据
这里准备了Oracle数据库中的经典数据。
dept.txt文件存放部门数据。
[root@node1 data]# cat dept.txt
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
[root@node1 data]#
emp.txt文件存放雇员数据。
[root@node1 data]# cat emp.txt
7369,SMITH,CLERK,7902,17-12-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2-81,1250,500,30
7566,JONES,MANAGER,7839,02-4-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5-81,2850,,30
7782,CLARK,MANAGER,7839,09-6-81,2450,,10
7839,KING,PRESIDENT,,17-11-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9-81,1500,0,30
7900,JAMES,CLERK,7698,03-12-81,950,,30
7902,FORD,ANALYST,7566,03-12-81,3000,,20
7934,MILLER,CLERK,7782,23-1-82,1300,,10
上传到HDFS
hdfs dfs -mkdir -p input
hdfs dfs -put emp.txt input
hdfs dfs -put dept.txt input
[root@node1 data]# hdfs dfs -mkdir -p input
[root@node1 data]# hdfs dfs -put emp.txt input
[root@node1 data]# hdfs dfs -put dept.txt input
[root@node1 data]# hdfs dfs -ls input
Found 2 items
-rw-r--r-- 3 root hbase 82 2017-06-23 11:04 input/dept.txt
-rw-r--r-- 3 root hbase 513 2017-06-23 11:04 input/emp.txt
[root@node1 data]#
7.4.2 问题描述
求解每个雇员所在部门,输出格式:雇员名,部门名
比如
RESEARCH,SMITH
SALES,ALLEN
7.4.3 编程
这个问题与SQL中的连接操作类似,将问题转换未1:N问题。
一个部门有多个雇员,一个雇员在唯一的部门。转换为1:N问题,部门是1端,雇员是多段
具体思路是,在map阶段读入emp.txt和dept.txt文件,将join的字段作为map输出key,再将每条记录标记上文件名作为map输出value;在reduce阶段做笛卡尔积。
(1)定义Mapper类
package cn.hadron.mr.join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//当前读取文件的路径
String filePath=((FileSplit)context.getInputSplit()).getPath().toString();
String joinKey="";
String joinValue="";
String fileFlag="";
String[] array=value.toString().split(",");
//判定当前行数据来自哪个文件
if(filePath.contains("dept.txt")){
fileFlag="l";//left
joinKey=array[0];//部门编号
joinValue=array[1];//部门名
}else if(filePath.contains("emp.txt")){
fileFlag="r";//right
joinKey=array[array.length-1];//部门编号
joinValue=array[1];//雇员名
}
//输出键值对,并标记来源文件
context.write(new Text(joinKey),new Text(joinValue+","+fileFlag));
}
}
(2)定义Reducer类
package cn.hadron.mr.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class JoinReducer extends Reducer<Text, Text, Text, Text>{
//相同部门的数据,发送到同一个reduce
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
Iterator<Text> it=values.iterator();
String deptName="";
List<String> empNames=new ArrayList<>();
while(it.hasNext()){
//取一行记录
String[] array=it.next().toString().split(",");
//判定当前记录来源于哪个文件,并根据文件格式解析记录获取相应的信息
if("l".equals(array[1])){//只有1条记录的flag=l
deptName=array[0];
}else if("r".equals(array[1])){
empNames.add(array[0]);
}
}
//求解笛卡尔积,对每个dept的1条记录与emp中多条记录作一次迭代
for(String en:empNames){
context.write(new Text(deptName), new Text(en));
}
}
}
(3)主方法
package cn.hadron.mr.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
public static void main(String[] args) {
// 设置环境变量HADOOP_USER_NAME,其值是root
System.setProperty("HADOOP_USER_NAME", "root");
// Configuration类包含了Hadoop的配置
Configuration config = new Configuration();
// 设置fs.defaultFS
config.set("fs.defaultFS", "hdfs://192.168.80.131:8020");
// 设置yarn.resourcemanager节点
config.set("yarn.resourcemanager.hostname", "node1");
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJarByClass(RunJob.class);
job.setJobName("JoinDemo");
// 设置Mapper和Reducer类
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
// 设置reduce方法输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定输入输出路径
FileInputFormat.addInputPath(job, new Path("/user/root/input/"));
Path outpath = new Path("/user/root/output/");
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
// 提交任务,等待执行完成
boolean f = job.waitForCompletion(true);
if (f) {
System.out.println("job任务执行成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
7.4.4 运行
run as –> Java Application
Eclipse控制台输出信息:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
job任务执行成功
查看结果
hdfs dfs -ls output
[root@node1 ~]# hdfs dfs -ls output
Found 2 items
-rw-r--r-- 3 root hbase 0 2017-06-23 14:05 output/_SUCCESS
-rw-r--r-- 3 root hbase 168 2017-06-23 14:05 output/part-r-00000
[root@hds117 ~]# hdfs dfs -cat output/part-r-00000
ACCOUNTING MILLER
ACCOUNTING KING
ACCOUNTING CLARK
RESEARCH FORD
RESEARCH JONES
RESEARCH SMITH
SALES JAMES
SALES TURNER
SALES BLAKE
SALES MARTIN
SALES WARD
SALES ALLEN