2020.9.10课堂笔记(idea中使用mapreduce实现wordcount)

mapreduce基础复习:

1. mapreduce流程

	                      job-->TaskTracker-->Map/Reduce Task
    resource manager-->app master-->container-->
	                           scheduler执行计划							   
	split-->map-->combiner-->partitioner-->reducer-->resource manager

2. hadoop1.0和hadoop2.0的区别
1.0管理和计算都是由mapreduce来完成
2.0管理由yarn负责,不仅仅减轻mapreduce的计算压力,而且使得计算框架可扩展
3. driver类的核心逻辑
Job对象–>指定driver类为驱动类–>设置mapper和reducer/[partitioner]–>设置mapper和reducer的输出–>设置输入输出文件路径–>运行
4. 文件关联的逻辑
mapper端负责把文件合并,并且根据业务逻辑,把相同的key归类
reducer端负责根据相同的key来进行关联

在idea中实现wordcount词频统计:

需求分析:
1.Map过程:并行读取文本,对读取的单词进行map操作,每个词都以<key,value>形式生成
2.Reduce操作是对map的结果进行排序合并最后得出词频

1.添加maven依赖

注意hadoop-core依赖与hadoop-auth冲突,要求先注释掉。

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.6.0</version>
    </dependency>
<!--    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>1.2.0</version>
    </dependency>-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-auth</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>2.6.0</version>
    </dependency>
  </dependencies>

2.编写mapper类

1.将文本转化成字符串
2.将字符串按指定规则切割
3.循环遍历,将每一个单词写出去

package cn.kgc.kb09.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * @Author: ChaoKeAiMuZhi
 * @Date: 2020/9/10 9:18
 * @Description:
 *  KEYIN:输入的key类型
 *  VALUEIN:输入的value类型
 *  KEYOUT:输出的key类型
 *  VALUEOUT:输出的value类型
 **/
public class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println(key);
        //1.将文本转化成字符串
        String line = value.toString();
        //2.将字符串按指定规则切割
        String[] words = line.split(" ");
        //3.循环遍历,将每一个单词写出去
        for (String word : words) {
            context.write(new Text(word),new IntWritable(1));
        }
    }
}

3.编写reduce类

reduce端收到的类型是key,迭代器
1.对迭代器进行累加求和
2.将key和value进行写出

package cn.kgc.kb09.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * @Author: ChaoKeAiMuZhi
 * @Date: 2020/9/10 9:39
 * @Description:
 **/
public class WCReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //reduce端接收到的类型大概是这样的 (wish,(1,1,1,1))
        //对迭代器进行累加求和
        //total必须赋值为0初始化,因为reduce方法是每个键都会执行一次
        int total=0;
        for (IntWritable value : values) {
            total+=value.get();
        }
        //将key和value进行写出
        context.write(key,new IntWritable(total));
    }
}

4.有可能还需要编写partitioner类分区

1.根据key的hashCode分区

package cn.kgc.kb09.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * @Author: ChaoKeAiMuZhi
 * @Date: 2020/9/10 12:23
 * @Description:
 **/
public class WCPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        return Math.abs(text.hashCode())%i;
    }
}

5.编写Driver类执行Job

1.创建配置文件,创建Job
2.设置jar的位置,参数为本类类名.class
3.设置map和reduce的位置
4.设置map输出端的key,value类型
5.设置reduce输出的key,value类型
6.设置输入和输出路径,输入的是本地自己建的txt文件,会输出一个test目录
7.提交程序运行

package cn.kgc.kb09.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 * @Author: ChaoKeAiMuZhi
 * @Date: 2020/9/10 9:47
 * @Description:
 **/
public class WCDriver {
    public static void main(String[] args) throws Exception {
        //1.建立连接
        Configuration cfg = new Configuration();
        Job job= Job.getInstance(cfg,"job_wc");
        job.setJarByClass(WCDriver.class);
        //2.指定mapper和reducer
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
        //指定mapper的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置partitioner
        job.setNumReduceTasks(4);
        job.setPartitionerClass(WCPartitioner.class);
        //指定reducer的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //指定输入输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //3.运行
        boolean result = job.waitForCompletion(true);
        System.out.println(result?"成功":"失败");
        System.exit(result?0:1);
    }
}

猜你喜欢

转载自blog.csdn.net/m0_48758256/article/details/108526665