代码
import org.apache.commons.lang.ObjectUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Created with IntelliJ IDEA. * User: @别慌 * Date: 2019-06-16 * Time: 0:44 * Description: */ public class PartitionerMain extends Configured implements Tool { public int run(String[] args) throws Exception { //job类,组装MR Job job=Job.getInstance(super.getConf(),PartitionerMain.class.getSimpleName()); //打包运行 job.setJarByClass(PartitionerMain.class); //第一步:读取文件,解析kv对 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path(args[0])); //自定义map逻辑 job.setMapperClass(partitionMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //第三部 分区 job.setPartitionerClass(partitionerOwn.class); //自定义reduce逻辑 job.setReducerClass(partitionReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(2); //设置输出类 job.setOutputFormatClass(TextOutputFormat.class); //输出路径不写死,通过参数传进来 TextOutputFormat.setOutputPath(job,new Path(args[1])); //提交任务 boolean b =job.waitForCompletion(true); return b?0:1; } public static void main(String args[]) throws Exception { int run= ToolRunner.run(new Configuration(),new PartitionerMain(),args); System.exit(run); } } import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * Created with IntelliJ IDEA. * User: @别慌 * Date: 2019-06-16 * Time: 0:56 * Description: */ public class partitionerOwn extends Partitioner<Text, NullWritable> { public int getPartition(Text text, NullWritable nullWritable, int i) { String[] spilt=text.toString().split("\t"); String gameResult=spilt[5]; if (null != gameResult && ""!=gameResult){ if (Integer.parseInt(gameResult)>15){ return 0; }else { return 1; } } return 0; } } import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Created with IntelliJ IDEA. * User: @别慌 * Date: 2019-06-16 * Time: 0:47 * Description: */ public class partitionMap extends Mapper<LongWritable, Text,Text,NullWritable> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } } import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created with IntelliJ IDEA. * User: @别慌 * Date: 2019-06-16 * Time: 1:14 * Description: */ public class partitionReduce extends Reducer<Text, NullWritable,Text,NullWritable> { protected void reduce(Text key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ali</groupId> <artifactId>hdfs</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.9.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.9.0</version> </dependency> </dependencies> </project>
java项目架构
- PartitionerMain
- partitionerOwn
- partitionMap
- partitionReduce
hadoop jar mouse.jar PartitionerMain /partition /outtest
jar包 main函数 hdfs上的目标文件 hdfs的输出文件(不能事先存在)
源文件(partition)
1 0 1 2017-07-31 23:10:12 834232 6 sadasaa
2 9 6 2019-06-76 78:97:12 687907 45 dsdskd