


               实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志” join “产品信息”






package com.empire.hadoop.mr.mapsidejoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapSideJoin {

    public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        // 用一个hashmap来加载保存产品信息表
        Map<String, String> pdInfoMap = new HashMap<String, String>();

        Text                k         = new Text();

         * 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product.txt")));
            String line;
            while (StringUtils.isNotEmpty(line = br.readLine())) {
                String[] fields = line.split("\t");
                pdInfoMap.put(fields[0], fields[2]);

        // 由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String orderLine = value.toString();
            String[] fields = orderLine.split("\t");
            String pdName = pdInfoMap.get(fields[1]);
            k.set(orderLine + "\t" + pdName);
            context.write(k, NullWritable.get());


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);



        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 指定需要缓存一个文件到所有的maptask运行节点工作目录
        /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
        /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
        /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
        /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录

        // 将产品表文件缓存到task工作节点的工作目录中去
        //job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt"));
        job.addCacheFile(new URI("hdfs://centos-aaron-h1:9000/rjoin/mapjoincache/product.txt"));


        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);






lcd d:/
put  mapsidejoin.jar


cd /home/hadoop/apps/hadoop-2.9.1
hadoop fs  -mkdir -p /rjoin/mapjoinsideinput
hadoop fs  -mkdir -p /rjoin/mapjoincache
hdfs dfs -put  order.txt  /rjoin/mapjoinsideinput
hdfs dfs -put  product.txt  /rjoin/mapjoincache


hadoop jar mapsidejoin.jar  com.empire.hadoop.mr.mapsidejoin.MapSideJoin /rjoin/mapjoinsideinput /rjoin/mapjoinsideoutput        


[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=189612
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=218
		HDFS: Number of bytes written=108
		HDFS: Number of read operations=5
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=3057
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=3057
		Total vcore-milliseconds taken by all map tasks=3057
		Total megabyte-milliseconds taken by all map tasks=3130368
	Map-Reduce Framework
		Map input records=4
		Map output records=4
		Input split bytes=125
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=99
		CPU time spent (ms)=350
		Physical memory (bytes) snapshot=117669888
		Virtual memory (bytes) snapshot=845942784
		Total committed heap usage (bytes)=16121856
	File Input Format Counters 
		Bytes Read=93
	File Output Format Counters 
		Bytes Written=108
[hadoop@centos-aaron-h1 ~]$  hdfs dfs -cat  /rjoin/mapjoinsideoutput/part-m-00000
1001    20150710        P0001   2       小米5
1002    20150710        P0001   3       小米5
1002    20150710        P0002   3       锤子T1
1003    20150710        P0003   3       锤子


