数据
(1)电影评分数据
用户ID::电影ID::评分::时间
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039…….
(2)用户数据
用户ID::性别::年龄(后面可忽略)
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117……
需求
将两个文件合并为一个文件,以用户ID作为外键,合并格式为:
用户ID 性别 年龄 电影ID 评分 时间
代码
两种方法使用相同的bean
Bean部分代码
public class JoinBean implements WritableComparable<JoinBean>{
private String uid;
private String age;
private String gender;
private String movieId;
private String rating;
public void set(String uid, String age, String gender, String movieId, String rating) {
this.uid = uid;
this.age = age;
this.gender = gender;
this.movieId = movieId;
this.rating = rating;
}
@Override
public void readFields(DataInput in) throws IOException {
uid = in.readUTF();
age = in.readUTF();
gender = in.readUTF();
movieId = in.readUTF();
rating = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(uid);
out.writeUTF(age);
out.writeUTF(gender);
out.writeUTF(movieId);
out.writeUTF(rating);
}
@Override
public int compareTo(JoinBean o) {
// TODO Auto-generated method stub
return 0;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public String getMovieId() {
return movieId;
}
public void setMovieId(String movieId) {
this.movieId = movieId;
}
public String getRating() {
return rating;
}
public void setRating(String rating) {
this.rating = rating;
}
@Override
public String toString() {
return "JoinBean [uid=" + uid + ", age=" + age + ", gender=" + gender + ", movieId=" + movieId + ", rating="
+ rating + "]";
}
}
(1)方法一,在本地执行,使用一个MapReduce实现
public class JoinMR {
public static class MapTask extends Mapper<LongWritable, Text, Text, JoinBean>{
String table;
@Override
protected void setup(Mapper<LongWritable, Text, Text, JoinBean>.Context context)
throws IOException, InterruptedException {
//获取文件名
FileSplit fileSplit = (FileSplit)context.getInputSplit();
table = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, JoinBean>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("::");
JoinBean joinBean = new JoinBean();
//将文件名作为两个文件数据的标识
if(table.startsWith("users")){
//读出users.dat文件内容,缺少部分以“null”代替
joinBean.set(split[0], split[2], split[1],"null", "null","user");
}else{
//读出ratings.dat文件,缺少部分以“null”代替
joinBean.set(split[0], "null", "null", split[1], split[2], "rating");
}
context.write(new Text(joinBean.getUid()), joinBean);
}
}
public static class ReduceTask extends Reducer<Text, JoinBean, JoinBean, NullWritable>{
@Override
protected void reduce(Text key, Iterable<JoinBean> values,
Reducer<Text, JoinBean, JoinBean, NullWritable>.Context context) throws IOException, InterruptedException {
//放user的数据
JoinBean joinBean = new JoinBean();
//放rating的数据
ArrayList<JoinBean> list = new ArrayList<>();
//分离数据
for (JoinBean joinBean2 : values) {
String table = joinBean2.getTable();
if("user".equals(table)){
joinBean.set(joinBean2.getUid(), joinBean2.getAge(), joinBean2.getGender(), joinBean2.getMovieId(), joinBean2.getRating(), joinBean2.getTable());
}else{
JoinBean joinBean3 = new JoinBean();
joinBean3.set(joinBean2.getUid(), joinBean2.getAge(), joinBean2.getGender(), joinBean2.getMovieId(), joinBean2.getRating(), joinBean2.getTable());
list.add(joinBean3);
}
}
//拼接数据
for (JoinBean joinBean2 : list) {
joinBean2.setAge(joinBean.getAge());
joinBean2.setGender(joinBean.getGender());
context.write(joinBean2, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "join");
//设置map和reduce,以及提交的jar
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(JoinMR.class);
//设置输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(JoinBean.class);
job.setOutputKeyClass(JoinBean.class);
job.setOutputValueClass(NullWritable.class);
//输入和输出目录
FileInputFormat.addInputPath(job, new Path("d:/data/in/join"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\join"));
//判断文件是否存在
File file = new File("d:\\data\\out\\join");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
//提交任务
boolean completion = job.waitForCompletion(true);
System.out.println(completion?"你很优秀!!!":"滚去调bug!!");
}
}
(2)方法二,在集群上执行,使用一个Map实现
public class JoinMR {
public static class MapTask extends Mapper<LongWritable, Text, JoinBean, NullWritable>{
Map<String,String> map = new HashMap<>();
@Override
protected void setup(Mapper<LongWritable, Text, JoinBean, NullWritable>.Context context)
throws IOException, InterruptedException {
//读出users.dat文件的数据,存入map中
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream inputStream = fs.open(new Path(context.getConfiguration().get("smallTableName")));
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while((line = br.readLine())!=null) {
String[] split = line.split("::");
map.put(split[0], line);
}
}
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, JoinBean, NullWritable>.Context context)
throws IOException, InterruptedException {
//读出ratings.dat文件中的数据
String[] split = value.toString().split("::");
JoinBean joinBean = new JoinBean();
//通过ratings.dat中的用户ID去查找users.dat中对应的数据
String[] string = map.get(split[0]).split("::");
//将数据分别写入对应位置
joinBean.set(split[0], string[2], string[1], split[1], split[2]);
context.write(joinBean, NullWritable.get());
}
}
public static void main(String[] args) throws Exception{
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
conf.set("smallTableName", args[2]);
Job job = Job.getInstance(conf, "join");
//设置map和reduce,以及提交的jar
job.setMapperClass(MapTask.class);
job.setJarByClass(JoinMR.class);
//设置输入输出类型
job.setOutputKeyClass(JoinBean.class);
job.setOutputValueClass(NullWritable.class);
//输入和输出目录
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//判断输出目录是否存在
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]),true);
}
//提交任务
boolean completion = job.waitForCompletion(true);
System.out.println(completion?"你很优秀!!!":"滚去调bug!!");
}
}