注意事项:提交任务时,经常出现权限问题。在windos配置环境变量HADOOP_USER_NAME,调用System.properties方法即可。从windows到linux也需要打包成jar放到指定路径
1.map
package nuc.edu.ls;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 对应1,2,3,4个泛型
* KEYIN 行偏移量 起始位置
* VALUEIN 每一行数据的类型
* KEYOUT 输出的KEY类型
* VALUEIN 输出的VALUE类型
* 不采用原生写法,如long而是LongWritable
* 为了序列化
* long或Integer采用原生(java)序列化 Serializable或者Externalizable:冗余 存储全类名 ,每一个数据的类型都会存储
* 而LongWritable等采用hadoop自己的序列化
* Long LongWritable
* Integer IntWritable
* String Text
* Float FloatWritable
* Double DoubleWritable
* null NullWritable
*/
public class MapTask extends Mapper<LongWritable, Text, Text, IntWritable>{
/*
* map阶段 对每一行数据进行切分,输出数据
*
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//value 每行的数据
String[] split = value.toString().split("\\s+");
for (String word : split) {
context.write(new Text(word),new IntWritable(1));
}
}
}
2.减小
package nuc.edu.ls;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ReduceTsk extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int count=0;
for (IntWritable value : values) {
count=count+value.get();
//count++
}
context.write(key, new IntWritable(count));
}
}
3.提交任务
job.setCombinerClass(ReduceTask.class) 合并操作可以提高效率(并不是所有时候都起作用)
第一种。提交到本地
package nuc.edu.ls
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.pengpeng.day05.firstMR.MapTask;
import cn.pengpeng.day05.firstMR.ReduceTask;
/**
* 本地模式
* 小数据测试,测试完成之后才改成集群模式进行提交
* @author root
*
*/
public class Driver {
public static void main(String[] args) throws Exception {
//声明使用哪个用户提交的
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); //设置hdfs集群在哪里
//conf.set("mapreduce.framework.name", "yarn"); //提交到哪里 yarn local
//conf.set("yarn.resourcemanager.hostname", "hadoop01"); //resourcemeanger 在哪里
//conf.set("mapreduce.app-submission.cross-platform", "true"); //windows 提交任务到linux上需要设置的参数
/**
* fs.defaultFS 默认值 file:/// 本地文件系统
* mapreduce.framework.name 默认值 local
*/
Job job = Job.getInstance(conf, "eclipseToCluster");
//设置map和reduce,以及提交的jar
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(Driver.class);
//job.setJar("C:\\Users\\root\\Desktop\\wc.jar");
//设置输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入和输出目录
FileInputFormat.addInputPath(job, new Path("D:\\data\\word.txt"));
FileOutputFormat.setOutputPath(job, new Path("d:\\data\\out\\wc"));
//判断文件是否存在
File file = new File("d:\\data\\out\\wc");
if(file.exists()){
FileUtils.deleteDirectory(file);
}
//提交任务
boolean completion = job.waitForCompletion(true);
System.out.println(completion?0:1);
}
}
第二种。打包成jar到linux运行使用hadoop jar xxx.jar driver类的全类名
package nuc.edu.ls;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 提交咱们的任务
* @author root
*/
public class Driver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
Job job = Job.getInstance(conf);
//设置job的map和reduce是哪一个,并且设置是哪一做任务提交
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
job.setJarByClass(Driver.class);
//设置输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入和输出目录
FileInputFormat.addInputPath(job, new Path("/wc.txt"));
FileOutputFormat.setOutputPath(job, new Path("/wordcount/wc-output"));
// 提交之后会监控运行状态
boolean completion = job.waitForCompletion(true);
System.out.println(completion?"程序执行完毕,没毛病!!!":"程序有问题,程序出bug了,赶紧加班调试!!!");
}
}
第三种.windos远程调用Linux的实现
package nuc.edu.ls;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.pengpeng.day05.firstMR.MapTask;
import cn.pengpeng.day05.firstMR.ReduceTask;
/**
* 从eclipse提交到集群
* @author root
*
*/
public class Driver {
public static void main(String[] args) throws Exception {
//声明使用哪个用户提交的
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); //设置hdfs集群在哪里
conf.set("mapreduce.framework.name", "yarn"); //提交到哪里 yarn local
conf.set("yarn.resourcemanager.hostname", "hadoop01"); //resourcemeanger 在哪里
conf.set("mapreduce.app-submission.cross-platform", "true"); //windows 提交任务到linux上需要设置的参数
Job job = Job.getInstance(conf, "eclipseToCluster");
//设置map和reduce,以及提交的jar
job.setMapperClass(MapTask.class);
job.setReducerClass(ReduceTask.class);
//job.setJarByClass(Driver.class);
job.setJar("C:\\Users\\root\\Desktop\\wc.jar");
//设置输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入和输出目录
FileInputFormat.addInputPath(job, new Path("/wc.txt"));
FileOutputFormat.setOutputPath(job, new Path("/wordcount/eclipse-out/"));
//判断文件是否存在
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path("/wordcount/eclipse-out/"))){
fs.delete(new Path("/wordcount/eclipse-out/"), true);
}
//提交任务
boolean completion = job.waitForCompletion(true);
System.out.println(completion?0:1);
}
}