关系代数运算
MapReduce可以在关系代数的运算上发挥重要的作用,因为关系代数运算具有数据相关性低的特性,这使得其便于进行MapReduce的并行化算法设计。
常见的关系代数运算包括选择、投影、并、交、差以及自然连接操作,都可以十分容易利用MapReduce来进行并行化。
选择操作
对于关系 应用条件 ,例如:查询分数大于90分的学生。我们只需要在Map阶段对于每个输入的记录判断是否满足条件,将满足条件的记录的输出即可。Reduce阶段无需做额外的工作。
代码:
package cn.zzuli.zcs0;
/**
* Created by 张超帅 on 2018/8/16.
*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* 表示一个关系的属性构成
* @author KING
*
*/
public class RelationA implements WritableComparable<RelationA>{
private int id;
private String name;
private int age;
private double weight;
public RelationA(){}
public RelationA(int id, String name, int age, double weight){
this.setId(id);
this.setName(name);
this.setAge(age);
this.setWeight(weight);
}
public RelationA(String line){
String[] value = line.split(",");
this.setId(Integer.parseInt(value[0]));
this.setName(value[1]);
this.setAge(Integer.parseInt(value[2]));
this.setWeight(Double.parseDouble(value[3]));
}
public boolean isCondition(int col, String value){
if(col == 0 && Integer.parseInt(value) == this.id)
return true;
else if(col == 1 && name.equals(value))
return true;
else if(col ==2 && Integer.parseInt(value) == this.age)
return true;
else if(col ==3 && Double.parseDouble(value) == this.weight)
return true;
else
return false;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public double getWeight() {
return weight;
}
public void setWeight(double weight) {
this.weight = weight;
}
public String getCol(int col){
switch(col){
case 0: return String.valueOf(id);
case 1: return name;
case 2: return String.valueOf(age);
case 3: return String.valueOf(weight);
default: return null;
}
}
public String getValueExcept(int col){
switch(col){
case 0: return name + "," + String.valueOf(age) + "," + String.valueOf(weight);
case 1: return String.valueOf(id) + "," + String.valueOf(age) + "," + String.valueOf(weight);
case 2: return String.valueOf(id) + "," + name + "," + String.valueOf(weight);
case 3: return String.valueOf(id) + "," + name + "," + String.valueOf(age);
default: return null;
}
}
@Override
public String toString(){
return id + "," + name + "," + age + "," + weight;
}
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(id);
out.writeUTF(name);
out.writeInt(age);
out.writeDouble(weight);
}
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
id = in.readInt();
name = in.readUTF();
age = in.readInt();
weight = in.readDouble();
}
public int compareTo(RelationA o) {
if(id == o.getId() && name.equals(o.getName())
&& age == o.getAge() && weight == o.getWeight())
return 0;
else if(id < o.getId())
return -1;
else
return 1;
}
}
package cn.zzuli.zcs0;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* Created by 张超帅 on 2018/8/16.
*/
public class Selection {
public static class SelectionMap extends Mapper<LongWritable, Text, RelationA, NullWritable>{
private int id;
private String value;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
id = context.getConfiguration().getInt("col", 0);
value = context.getConfiguration().get("value");
}
@Override
protected void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException {
RelationA record = new RelationA(line.toString());
if(record.isCondition(id, value))
context.write(record, NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
//selectionJob.setJobName("selectionJob");
conf.setInt("col", Integer.parseInt(args[2]));
conf.set("value", args[3]);
Job selectionJob = Job.getInstance(conf, "selectionJob");
selectionJob.setJarByClass(Selection.class);
selectionJob.setMapperClass(SelectionMap.class);
selectionJob.setMapOutputKeyClass(RelationA.class);
selectionJob.setMapOutputKeyClass(NullWritable.class);
selectionJob.setNumReduceTasks(0);
selectionJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(selectionJob, new Path(args[0]));
FileOutputFormat.setOutputPath(selectionJob, new Path(args[1]));
selectionJob.waitForCompletion(true);
System.out.println("Finished!");
}
}
投影操作
例如在关系R上应用投影操作获得属性AGE的所有值。我们只需要在Map阶段将每条记录在该属性上的值作为键输出即可,此时对应该键的值为 一个自定义类型 的一个对象。而在Reduce端我们仅仅将Map端输入的键输出即可。注意,此时投影操作具有去重的功能。
package cn.zzuli.zcs0;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* Created by 张超帅 on 2018/8/16.
*/
public class Projection {
public static class ProjectionMap extends Mapper<LongWritable, Text, Text, NullWritable> {
private int col;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
col = context.getConfiguration().getInt("col", 0);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
RelationA record = new RelationA(value.toString());
context.write(new Text(record.getCol(col)), NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.setInt("col", Integer.parseInt(args[2]));
Job projectionJob = Job.getInstance(conf, "ProgectionJob");
projectionJob.setJarByClass(Projection.class);
projectionJob.setMapperClass(ProjectionMap.class);
projectionJob.setMapOutputKeyClass(Text.class);
projectionJob.setMapOutputValueClass(NullWritable.class);
projectionJob.setNumReduceTasks(0);
projectionJob.setInputFormatClass(TextInputFormat.class);
projectionJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(projectionJob, new Path(args[0]));
FileOutputFormat.setOutputPath(projectionJob, new Path(args[1]));
projectionJob.waitForCompletion(true);
System.out.println("finished!");
}
}
交运算
获得两张表交集的主要思想如下:如果有一个关系T和关系R为同一个模式,我们希望获得R和T的交集,那么在Map阶段我们对于R和T中的每一条数据记录r输出
.
在Reduce阶段汇总计数,如果计数为2,我们刚将该记录输出。这里我们有一个需要额外注意的地方。我们只有将R和T表中相同的记录都发送到了同一个Reduce节点才会被其正确的判断为是交集中 的一个记录而输出,因此我们必须保证相同的记录会被发送到相同的Reduce节点。由于实现时使用了RelationA对象作为主键,这是MapReduce默认会通过对象的hashcode值来划分Map的中间结果并输出到不同的Reduce节点。因此这里我们需要重写自定义类的hashCode方法使得值相同的对象的hashcode值也一定相同。例如,这里对应关系R的类的定义如下,我们需要根据四个域的值来重写hashCode()方法使得具有相同域值的记录具有相同的哈希值。
package cn.zzuli.zcs0;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* Created by 张超帅 on 2018/8/16.
*/
public class Intersection {
public static class IntersectionMap extends Mapper<LongWritable, Text, RelationA, IntWritable> {
private IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
RelationA record = new RelationA(value.toString());
context.write(record, one);
}
}
public static class IntersectionReduce extends Reducer<RelationA, IntWritable, RelationA, NullWritable>{
@Override
protected void reduce(RelationA key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable val : values) {
sum += val.get();
}
if(sum == 2)
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job Intersectionjob = Job.getInstance(conf, "Intersection");
Intersectionjob.setJarByClass(Intersection.class);
Intersectionjob.setMapperClass(IntersectionMap.class);
Intersectionjob.setReducerClass(IntersectionReduce.class);
Intersectionjob.setMapOutputKeyClass(RelationA.class);
Intersectionjob.setMapOutputValueClass(IntWritable.class);
Intersectionjob.setOutputValueClass(NullWritable.class);
Intersectionjob.setOutputKeyClass(RelationA.class);
Intersectionjob.setInputFormatClass(TextInputFormat.class);
Intersectionjob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(Intersectionjob, new Path(args[0]));
FileOutputFormat.setOutputPath(Intersectionjob, new Path(args[1]));
Intersectionjob.waitForCompletion(true);
System.out.println("OVER");
}
}
差运算
例如,计算R-T,也即希望找出在R中存在而在T中不存在的记录,则对于R和T中的每一条记录r在Map阶段分别输出键值对(r,R)和(r,T).在Reduce阶段检查一条记录r的多有对应值列表。如果只有R而没有T则将该条记录输出。这里与上面的交运算相似,都需要注意相同的记录应该被发送到相同的Reduce节点。
package cn.zzuli.zcs0;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.lang.*;
/**
* Created by 张超帅 on 2018/8/16.
*/
public class Difference {
public static class DifferenceMap extends Mapper<Object, Text, RelationA, Text > {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit)context.getInputSplit();
String filename = split.getPath().getName();
System.out.println(filename);
RelationA relation = new RelationA(value.toString());
context.write(relation, new Text(filename));
}
}
public static class DifferenceReduce extends Reducer<RelationA, Text, RelationA, NullWritable> {
String setR;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
setR = context.getConfiguration().get("setR");
}
@Override
protected void reduce(RelationA key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer str = new StringBuffer();
for(Text val : values) {
str.append(val.toString()).append(",");
}
if(str.toString().contains(setR))
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("setR", args[2]);
Job differenceJob = Job.getInstance(conf, "differenceJob");
differenceJob.setJarByClass(Difference.class);
differenceJob.setMapperClass(DifferenceMap.class);
differenceJob.setReducerClass(DifferenceReduce.class);
differenceJob.setMapOutputKeyClass(RelationA.class);
differenceJob.setMapOutputValueClass(Text.class);
differenceJob.setOutputKeyClass(RelationA.class);
differenceJob.setOutputValueClass(NullWritable.class);
differenceJob.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(differenceJob, new Path(args[0]));
FileOutputFormat.setOutputPath(differenceJob, new Path(args[1]));
differenceJob.waitForCompletion(true);
System.out.println("Over");
}
}
自然连接
例如,我们需要在属性ID上作关系R和关系S的自然连接。在Map阶段对于每一条R和S中的记录r,我们把它的ID的值作为键,其余属性的值以及R(S中的记录为S的名称)的名称作为值输出出去。在Reduce阶段我们则将同一键中的所有值根据他们的来源(R和S)分为两组做笛卡尔乘积然后将得到的结果输出出去。