1.用途:有时候需要拼接两个文件的东西,比如用户文件里面有用户信息,还有购物文件里面有用户的购物信息,这时候需要将两个不同的文件拼接起来。
2.思路:首先将文件的内容读出来放到一个容器里面方便使用,其次通过两个文件的相同属性,将同一个用户的不同订单连接起来
3.code:(后面有一种常见的特俗情况,更加节省时间和空间)
(1)
//JoinBean代码,里面实现了自定义类的序列化
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class JoinBean implements Writable{
private String uid;
private String age;
private String gender;
private String movieId;
private String rating;
private String table;
public void set(String uid, String age, String gender, String movieId, String rating, String table) {
this.uid = uid;
this.age = age;
this.gender = gender;
this.movieId = movieId;
this.rating = rating;
this.table = table;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(uid);
out.writeUTF(age);
out.writeUTF(gender);
out.writeUTF(movieId);
out.writeUTF(rating);
out.writeUTF(table);
}
@Override
public void readFields(DataInput in) throws IOException {
uid = in.readUTF();
age = in.readUTF();
gender = in.readUTF();
movieId = in.readUTF();
rating = in.readUTF();
table = in.readUTF();
}
@Override
public String toString() {
return "JoinBean [uid=" + uid + ", age=" + age + ", gender=" + gender + ", movieId=" + movieId + ", rating="
+ rating + ", table=" + table + "]";
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public String getMovieId() {
return movieId;
}
public void setMovieId(String movieId) {
this.movieId = movieId;
}
public String getRating() {
return rating;
}
public void setRating(String rating) {
this.rating = rating;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
}
(2)JoinMR(使用在eclipse上通过集群运行的方式启动,可以参考前面文章的启动集群的三种方式)
JoinMR
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
* 将两个不同文件的数据进行拼接
* 有相同的Uid
* @author Hailong
*思路:map阶段读取问价内容,但是要记下文件的名称,方便之后的对接
*reduce阶段分别读取文件内容
*将rating文件的内容追加到User问价的用户后面
*/
public class JoinMR {
public static class MapTask extends Mapper<LongWritable, Text, JoinBean, NullWritable>{
Map<String,String> map = new HashMap<>();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//在setup阶段读一次用户表就可以,利用map将数据装好
Configuration conf = context.getConfiguration();
String smallTableName = conf.get("smallTableName");
FileSystem fs = FileSystem.get(conf);
FSDataInputStream open = fs.open(new Path(smallTableName));
BufferedReader br = new BufferedReader(new InputStreamReader(open));
String line = null;
while((line = br.readLine())!=null) {
String[] split = line.split("::");
map.put(split[0], line);
}
}
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//直接根据map里面的数据找到对应的id拼接数据
String[] split = value.toString().split("::");
JoinBean joinBean = new JoinBean();
String[] line = map.get(split[0]).split("::");
joinBean.set(split[0], line[2], line[1], split[1], split[2], "null");
context.write(joinBean,NullWritable.get());
}
}
public static void main(String[] args) throws Exception{
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
// 1、设置job运行时要访问的默认文件系统
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
// 2、设置job提交到哪去运行
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hadoop01");
// 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数
conf.set("mapreduce.app-submission.cross-platform","true");
conf.set("smallTableName", args[2]);
Job job = Job.getInstance(conf);
// 1、封装参数:jar包所在的位置
job.setJar("C:\\Users\\Hailong\\Desktop\\w2.jar");
job.setMapperClass(MapTask.class);
job.setOutputKeyClass(JoinBean.class);
job.setOutputValueClass(NullWritable.class);
FileSystem fs = FileSystem.get(conf);
if(!fs.exists(new Path("/data/out/join"))) {
fs.delete(new Path("/data/out/join"),true);
}
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、封装参数:想要启动的reduce task的数量
job.setNumReduceTasks(2);
boolean completion = job.waitForCompletion(true);
System.out.println(completion?"程序执行完毕,没毛病!!!":"程序有问题,程序出bug了,赶紧加班调试!!!");
}
}