MRWordCount

1-Map
package com.sdg.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

  • Create by gao on 2018/6/25
  • 这个阶段是map 阶段
  • 我怎么知道我的事什么类型
  • 默认的是textInputFormat
  • <Text ,>:这个组建时一行一行的读取数据,把第一行起始偏移量做key 把这一行的内容作为value
  • hello hadoop
  • hello
  • <0,hellO hadoop>
  • <14,hellO> 因此默认的情况下是keyIn 是int 类型 那么自然而然valueIn 的类型是String
  • 输出
  • int String long 在序列的过程中,hadoop认为jdk的不好用
  • 因此hadoop 自己封装了一个类型
    */
    public class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    System.out.println(value);
    String[] split = value.toString().split(" ");
    for (String word : split) {
    //map阶段的输出使用程序的上下问题
    context.write(new Text(word),new LongWritable(1));

     }

    }
    }

2-Reduce
package com.sdg.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**

  • Create by gao on 2018/6/25
  • 确定四个类型很重要
    */
    public class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
    int count = 0;
    for (LongWritable value : values) {

         count += value.get();
     }
     System.out.println(key);
     context.write(key, new LongWritable(count));

    }
    }

3-job提交
package com.sdg.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

/**

  • Create by gao on 2018/6/25
    */
    public class Job01 {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //没有配置文件就是本地
    //有配置文件就是集群
    Configuration conf = new Configuration();
    // conf.set("dfs.defaultFS","hdfs://vm2:9000");
    //job完成先关信息的组装
    Job job = Job.getInstance(conf);
    //指定本次job 运行的主类
    job.setJarByClass(Job01.class);
    //指定本次的map reducer 的类是什么
    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);
    //指定本次map阶段输出kv类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    //指定mr 阶段最终的输出
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    //指定本次mr程序的输入的数据以及数据的结果在哪里(直接就是在hdfs上就可以)
    FileInputFormat.setInputPaths(job, new Path("D:/wordcount/input"));
    FileOutputFormat.setOutputPath(job, new Path("D:/wordcount/output"));
    boolean b = job.waitForCompletion(false);

     //退出系统
     System.exit(b ? 0 : 1);

    }
    }

猜你喜欢

转载自www.cnblogs.com/sgjk/p/javaWordCount.html