上一篇文章讲了mapreduce配合实现join,本节博主将讲述在map端的join实现;
一、需求
实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志” join “产品信息”
二、分析
--原理阐述:适用于关联表中有小表的情形;可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度
--示例:先在mapper类中预先定义好小表,进行join
--并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加载到小表的数据,进而在本地即可实现join
三、代码实现
package com.empire.hadoop.mr.mapsidejoin;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapSideJoin {
public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
// 用一个hashmap来加载保存产品信息表
Map<String, String> pdInfoMap = new HashMap<String, String>();
Text k = new Text();
/**
* 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product.txt")));
String line;
while (StringUtils.isNotEmpty(line = br.readLine())) {
String[] fields = line.split("\t");
pdInfoMap.put(fields[0], fields[2]);
}
br.close();
}
// 由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String orderLine = value.toString();
String[] fields = orderLine.split("\t");
String pdName = pdInfoMap.get(fields[1]);
k.set(orderLine + "\t" + pdName);
context.write(k, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapSideJoin.class);
//job.setJar("D:/mapsidejoin.jar");
job.setMapperClass(MapSideJoinMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 指定需要缓存一个文件到所有的maptask运行节点工作目录
/* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
/* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
/* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
/* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录
// 将产品表文件缓存到task工作节点的工作目录中去
//job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt"));
job.addCacheFile(new URI("hdfs://centos-aaron-h1:9000/rjoin/mapjoincache/product.txt"));
//map端join的逻辑不需要reduce阶段,设置reducetask数量为0
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
四、执行程序
#上传jar
Alt+p
lcd d:/
put mapsidejoin.jar
#准备hadoop处理的数据文件
cd /home/hadoop/apps/hadoop-2.9.1
hadoop fs -mkdir -p /rjoin/mapjoinsideinput
hadoop fs -mkdir -p /rjoin/mapjoincache
hdfs dfs -put order.txt /rjoin/mapjoinsideinput
hdfs dfs -put product.txt /rjoin/mapjoincache
#运行mapsidejoin程序
hadoop jar mapsidejoin.jar com.empire.hadoop.mr.mapsidejoin.MapSideJoin /rjoin/mapjoinsideinput /rjoin/mapjoinsideoutput
五、运行效果
IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #87 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getCounters
[IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #87
[main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getCounters took 36ms
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=189612
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=218
HDFS: Number of bytes written=108
HDFS: Number of read operations=5
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3057
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=3057
Total vcore-milliseconds taken by all map tasks=3057
Total megabyte-milliseconds taken by all map tasks=3130368
Map-Reduce Framework
Map input records=4
Map output records=4
Input split bytes=125
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=99
CPU time spent (ms)=350
Physical memory (bytes) snapshot=117669888
Virtual memory (bytes) snapshot=845942784
Total committed heap usage (bytes)=16121856
File Input Format Counters
Bytes Read=93
File Output Format Counters
Bytes Written=108
[main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:328)
[IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop sending #88 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getJobReport
[IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1880635722) connection to centos-aaron-h2/192.168.29.145:37772 from hadoop got value #88
[main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getJobReport took 0ms
[pool-4-thread-1] DEBUG org.apache.hadoop.ipc.Client - stopping client from cache: org.apache.hadoop.ipc.Client@303c7016
[Thread-3] DEBUG org.apache.hadoop.util.ShutdownHookManager - ShutdownHookManger complete shutdown.
六、运行结果
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /rjoin/mapjoinsideoutput/part-m-00000
1001 20150710 P0001 2 小米5
1002 20150710 P0001 3 小米5
1002 20150710 P0002 3 锤子T1
1003 20150710 P0003 3 锤子
最后寄语,以上是博主本次文章的全部内容,如果大家觉得博主的文章还不错,请点赞;如果您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,并且欢迎随时跟博主沟通交流。