查询员工表中部门所有人的工资总和
SQL 语句为:select deptno , sum(sal) from emp group by deptno order by deptno;
下面用MapReduce程序来实现对该SQL语句相同的查询功能
Mapper程序如下:
package SalaryTotal;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
@Override
protected void map(LongWritable key1, Text value1,Context context)
throws IOException, InterruptedException {
/*
* context 表示Mapper的上下文
* 上文:HDFS
* 下文:Mapper
*/
//数据:7900,JAMES,CLERK,7698,1981/12/3,950,,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//输出 k2部门号 v2薪水
for(String w:words) {
context.write(new IntWritable(Integer.parseInt(words[7])), new IntWritable(Integer.parseInt(words[5])));
}
}
}
Reducer程序如下:
package SalaryTotal;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class SalaryTotalReucer extends Reducer<IntWritable, IntWritable,IntWritable ,IntWritable> {
@Override
protected void reduce(IntWritable k3, Iterable<IntWritable> v3,Context context)
throws IOException, InterruptedException {
//对v3求和,得到该部门的工资总额
int total = 0;
for(IntWritable v:v3) {
total += v.get();
}
//输出
context.write(k3, new IntWritable(total));
}
}
主程序如下:
package SalaryTotal;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SalaryTotalMain {
public static void main(String[] args) throws Exception {
//创建一个job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SalaryTotalMain.class);
//指定job的mapper和输出的类型 k2 v2
job.setMapperClass(SalaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
//指定job的reducer和输出的类型 k4 v4
job.setReducerClass(SalaryTotalReucer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//指定job的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}
在Hadoop上执行此MapReduce程序
hadoop jar salary.jar /scott/emp.csv /output/0814/salary
查看执行的结果
由此结果可以看出,此MapReduce程序和SQL 语句的执行结果一样,即MapReduce程序正确。