package com.ghgj.cn.mapjoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//这种方法是小文件加载,匹配大文件,不需要reduce,所有将reducetask关掉
public class MapJoin {
static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
Text mk=new Text();
Text mv=new Text();
Map<String,String> map=new HashMap<String,String>();
//读取movies这个表 将表中的所有数据先加载过来
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//进行movies文件的读取
Path[] localCacheFiles = context.getLocalCacheFiles();
//缓存文件路径
Path path=localCacheFiles[0];
//创建一个字符流
BufferedReader br=new BufferedReader(new FileReader(path.toString()));
String line=null;
while((line=br.readLine())!=null){
//1::Toy Story (1995)::Animation|Children's|Comedy
String[] split = line.split("::");
map.put(split[0], split[1]+"\t"+split[2]);
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
//先读取ratings文件 每次读取一行 和map集合中的数据进行关联
// 1::1193::5::978300760 ratings.dat
String[] split = value.toString().split("::");
String k=split[1];
if(map.containsKey(k)){
//进行关联 取出map的value 和 现在的数据进行关联
String res=map.get(k)+"\t"+split[0]+"\t"+split[2]+"\t"+split[3];
mk.set(k);
mv.set(res);
context.write(mk, mv);
}
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
System.setProperty("HADOOP_USER_NAME", "hadoop");
//加载配置文件
Configuration conf=new Configuration();
//启动一个job 这里的mr任务叫做job 这个job作用 封装mr任务
Job job=Job.getInstance(conf);
//指定当前任务的主类 jarclass=Driver.class
job.setJarByClass(MapJoin.class);
//指定map
job.setMapperClass(MyMapper.class);
/*
* 泛型:jdk1.5 泛型的编译的时候生效 检查代码的类型是否匹配 运行的时候自动擦除
*/
//指定map输出的key value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//将指定的文件加载到每一个运行计算任务节点的缓存中
job.addCacheFile(new URI("hdfs://hadoop01:9000/join_in/movies.dat"));
//不需要reducetask设置这里
job.setNumReduceTasks(0);
//修改切片的大小
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/join_in/ratings.dat"));
//指定输出目录 输出路径不能存在 否则报错 代码运行的饿时候 会帮你创建
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/mapjoin_out01"));
//提交job 参数:是否打印执行日志
job.waitForCompletion(true);
}
}
MapReduce 之 ---MapJoin
猜你喜欢
转载自blog.csdn.net/YZY_001/article/details/82320762
今日推荐
周排行