1.上传输入文件到HDFS中
右侧矩阵
1 1_0,2_5,3_-1,4_3,5_-3 2 1_1,2_7,3_2,4_-2,5_-2 3 1_2,2_0,3_5,4_-1,5_3 4 1_-2,2_4,3_-1,4_1,5_1
左侧矩阵
1 1_1,2_2,3_3,4_0 2 1_3,2_3,3_4,4_-4 3 1_-2,2_0,3_1,4_3 4 1_4,2_3,3_-1,4_2 5 1_-4,2_3,3_0,4_2
2.编写第一个mapreduce程序,将右侧矩阵转置(行变列,列变行)
Mapper1.java代码如下
package step1; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author liyijie * @date 2018年5月13日下午10:36:18 * @email [email protected] * @remark * @version * * 矩阵相乘 * 将右矩阵行和列转置 */ public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> { private Text outKey = new Text(); private Text outValue = new Text(); /** * key:1 * value:1 1_0,2_3,3_-1,4_2,5_-3 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] rowAndLine = value.toString().split("\t"); //矩阵行号 String row = rowAndLine[0]; //列值 String[] lines = rowAndLine[1].split(","); for(int i = 0 ; i<lines.length; i++){ String cloumn = lines[i].split("_")[0]; String valueStr = lines[i].split("_")[1]; //key:列号 value:行号_值 outKey.set(cloumn); outValue.set(row+"_"+valueStr); context.write(outKey, outValue); } } }
Reducer1.java代码如下
package step1; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author liyijie * @date 2018年5月13日下午10:56:28 * @email [email protected] * @remark * @version * * * 矩阵相乘 * 将右矩阵行和列转置 */ public class Reducer1 extends Reducer<Text, Text, Text, Text> { private Text outKey = new Text(); private Text outValue = new Text(); //key:列号 value:行号_值,行号_值,行号_值,行号_值... @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); //text:行号_值 for(Text text:values){ sb.append(text).append(","); } String line = null; if(sb.toString().endsWith(",")){ line = sb.substring(0, sb.length()-1); } outKey.set(key); outValue.set(line); context.write(outKey,outValue); } }
MR1.java代码如下
package step1; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author liyijie * @date 2018年5月13日下午11:07:13 * @email [email protected] * @remark * @version */ public class MR1 { private static String inputPath = "/matrix/step1_input/matrix2.txt"; private static String outputPath = "/matrix/step1_output"; private static String hdfs = "hdfs://node1:9000"; public int run(){ try { Configuration conf=new Configuration(); conf.set("fs.defaultFS", hdfs); Job job = Job.getInstance(conf,"step1"); //配置任务map和reduce类 job.setJarByClass(MR1.class); job.setJar("F:\\eclipseworkspace\\matrix\\matrix.jar"); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf); Path inpath = new Path(inputPath); if(fs.exists(inpath)){ FileInputFormat.addInputPath(job,inpath); }else{ System.out.println(inpath); System.out.println("不存在"); } Path outpath = new Path(outputPath); fs.delete(outpath,true); FileOutputFormat.setOutputPath(job, outpath); return job.waitForCompletion(true)?1:-1; } catch (ClassNotFoundException | InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return -1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { int result = -1; result = new MR1().run(); if(result==1){ System.out.println("step1运行成功"); }else if(result==-1){ System.out.println("step1运行失败"); } } }
运行MR1得到转置后的右侧矩阵
3.编写第二个mapreduce程序,把step1中输出的已转置的右侧矩阵加入到缓存中,将左侧矩阵和右侧矩阵相乘
Mapper2.java代码如下
package step2; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author liyijie * @date 2018年5月13日下午11:43:51 * @email [email protected] * @remark * @version * * 左右矩阵相乘 */ public class Mapper2 extends Mapper<LongWritable, Text, Text, Text> { private Text outKey = new Text(); private Text outValue = new Text(); //private List<String> cacheList = new ArrayList<String>(); // 右矩阵列值 下标右行 右值 private Map<String,String[]> cacheMap = new HashMap<>(); /**在map执行之前会执行这个方法,只会执行一次 * * 通过输入流将全局缓存中的矩阵读入一个java容器中 */ @Override protected void setup(Context context)throws IOException, InterruptedException { super.setup(context); FileReader fr = new FileReader("matrix2"); BufferedReader br = new BufferedReader(fr); //右矩阵 //每一行的格式:列号 行_值,行_值,行_值,行_值 String line = null; while((line=br.readLine())!=null){ //cacheList.add(line); String[] cloumnAndLine_matrix2 = line.split("\t"); String key = cloumnAndLine_matrix2[0]; String[] row_value_array_matrix2 =cloumnAndLine_matrix2[1].split(","); String[] row_value_list_matrix2 = new String[row_value_array_matrix2.length]; for(int i = 0;i<row_value_array_matrix2.length;i++){ String row_value = row_value_array_matrix2[i]; String[] split = row_value.split("_"); String row = split[0]; String value = split[1]; row_value_list_matrix2[Integer.parseInt(row)-1]=value; } cacheMap.put(key, row_value_list_matrix2); } fr.close(); br.close(); } /** * key: 行号 * value:行 列_值,列_值,列_值,列_值 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] rowAndLine_matrix1 = value.toString().split("\t"); //矩阵行号 String row_matrix1 = rowAndLine_matrix1[0]; //列_值 String[] cloumn_value_array_matrix1 = rowAndLine_matrix1[1].split(","); //右矩阵列集合 Set<String> cloumns_matrix2 = cacheMap.keySet(); for(String cloumn_matrix2:cloumns_matrix2){ //矩阵两位相乘得到的结果 int result = 0; String[] row_value_list_matrix2 = cacheMap.get(cloumn_matrix2);//取右矩阵第n列 for(String cloumn_value_matrix1:cloumn_value_array_matrix1){ String[] split = cloumn_value_matrix1.split("_"); int cloumn_matrix1 = Integer.parseInt(split[0]); int v_matrix1 = Integer.parseInt(split[1]); int v_matrix2 = Integer.parseInt(row_value_list_matrix2[cloumn_matrix1-1]);//取右矩阵第n列第cloumn_matrix1行 result +=v_matrix1*v_matrix2; } //result就是结果矩阵中的某个元素,坐标 行:row_matrix1 列:key(右侧矩阵已经被转置) outKey.set(row_matrix1); outValue.set(cloumn_matrix2+"_"+result); System.out.println("mapper2---send-->key:"+outKey+" value:"+outValue); //输出格式为 key:行 value:列_值 context.write(outKey, outValue); } /**for(String line:cacheList){ String[] rowAndLine_matrix2 = line.toString().split("\t"); //右侧矩阵line //格式: 列 tab 行_值,行_值,行_值,行_值 String cloumn_matrix2 = rowAndLine_matrix2[0]; String[] row_value_array_matrix2 = rowAndLine_matrix2[1].split(","); //矩阵两位相乘得到的结果 int result = 0; //遍历左侧矩阵一行的每一列 for(String cloumn_value_matrix1:cloumn_value_array_matrix1){ String cloumn_matrix1 = cloumn_value_matrix1.split("_")[0]; String value_matrix1 = cloumn_value_matrix1.split("_")[1]; //遍历右侧矩阵一行的每一列 for(String cloumn_value_matrix2:row_value_array_matrix2){ if(cloumn_value_matrix2.startsWith(cloumn_matrix1+"_")){ String value_matrix2 = cloumn_value_matrix2.split("_")[1]; //将两列的值相乘并累加 result+= Integer.valueOf(value_matrix1)*Integer.valueOf(value_matrix2); } } } //result就是结果矩阵中的某个元素,坐标 行:row_matrix1 列:row_matrix2(右侧矩阵已经被转置) outKey.set(row_matrix1); outValue.set(cloumn_matrix2+"_"+result); //输出格式为 key:行 value:列_值 context.write(outKey, outValue); }**/ } }
Reducer2.java代码如下
package step2; import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author liyijie * @date 2018年5月13日下午11:43:59 * @email [email protected] * @remark * @version */ public class Reducer2 extends Reducer<Text, Text, Text, Text>{ private Text outKey = new Text(); private Text outValue = new Text(); //key:行 value:列号_值,列号_值,列号_值,列号_值,列号_值 @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { System.out.println("reducer2-------receive--->key:"+key+" values:"+values.toString()); StringBuilder sb = new StringBuilder(); Set<String> values_set = new HashSet<>(); for(Text text:values){ String value = text.toString(); values_set.add(value); } String[] list = new String[values_set.size()]; System.out.println("key:"+key+" 开始排序"); //text:行号_值 for(String value:values_set){ String[] split = value.split("_"); int cloumn = Integer.parseInt(split[0]); String v = split[1]; list[cloumn-1] = v; } System.out.println("key:"+key+" 结束排序"); System.out.println("key:"+key+" 开始整理结果"); for(int i = 0;i<list.length;i++){ sb.append(i+1).append("_").append(list[i]).append(","); } System.out.println("key:"+key+" 结束整理结果"); String line = null; if(sb.toString().endsWith(",")){ line = sb.substring(0, sb.length()-1); } outKey.set(key); outValue.set(line); System.out.println("reducer2---send-->key:"+outKey+" value:"+outValue); context.write(outKey,outValue); } }
MR2.java代码如下
package step2; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author liyijie * @date 2018年5月13日下午11:44:07 * @email [email protected] * @remark * @version */ public class MR2 { private static String inputPath = "/matrix/step2_input/matrix1.txt"; private static String outputPath = "/matrix/output"; //将step1中输出的转置矩阵作为全局缓存 private static String cache="/matrix/step1_output/part-r-00000"; private static String hdfs = "hdfs://node1:9000"; public int run(){ try { Configuration conf=new Configuration(); conf.set("fs.defaultFS", hdfs); Job job = Job.getInstance(conf,"step2"); //如果未开启,使用 FileSystem.enableSymlinks()方法来开启符号连接。 FileSystem.enableSymlinks(); //要使用符号连接,需要检查是否启用了符号连接 boolean areSymlinksEnabled = FileSystem.areSymlinksEnabled(); System.out.println(areSymlinksEnabled); //添加分布式缓存文件 job.addCacheArchive(new URI(cache+"#matrix2")); //配置任务map和reduce类 job.setJarByClass(MR2.class); job.setJar("F:\\eclipseworkspace\\matrix\\matrix.jar"); job.setMapperClass(Mapper2.class); job.setReducerClass(Reducer2.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fs = FileSystem.get(conf); Path inpath = new Path(inputPath); if(fs.exists(inpath)){ FileInputFormat.addInputPath(job,inpath); }else{ System.out.println(inpath); System.out.println("不存在"); } Path outpath = new Path(outputPath); fs.delete(outpath,true); FileOutputFormat.setOutputPath(job, outpath); return job.waitForCompletion(true)?1:-1; } catch (ClassNotFoundException | InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); } return -1; } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { int result = -1; result = new MR2().run(); if(result==1){ System.out.println("step2运行成功"); }else if(result==-1){ System.out.println("step2运行失败"); } } }
运行MR2得到结果如下