版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
1. 说明
本文给出MR编程中,map、reduce、runner三个模块的编程示例。下边的代码中在map中加入多线程。
2. 示例
1. mapper
在mapper中,仅仅继承Mapper类,编写map方法即可。
public class GenFileChecksumMapper extends Mapper<LongWritable, Text, Text, Text> {
public static Configuration conf;
static {
conf = HDFSUtils.getConf();
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] v = convertToHdfsPath(value.toString()); // 转成hdfs路径
//保证value是 分区\t库\t表的格式
if(v.length < 3){
return;
}
Path path;
final FileSystem fs;
try{
path = new Path(v[0]);
fs = path.getFileSystem(conf); //通过path schema来获得fs
}catch (Exception e){
e.printStackTrace();
return;
}
List<Entity> files = Lists.newArrayList();
if(fs.exists(path)) {
RemoteIterator<LocatedFileStatus> it = fs.listFiles(path, true); // 列出所有文件
while (it.hasNext()){
files.add(new Entity(fs.resolvePath(it.next().getPath()),
v[1], v[2])); //获得真实的schema
}
}
// 每个目录的平均值为40,默认给两个线程运行。动态控制线程数,线程数不能太大,否则rpc压力大
int threadNum = 2;
int size = files.size();
if(size > 1000){ //有har文件的时候会非常大
threadNum = 5;
}else if(size > 100){
threadNum = 3;
}
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
CompletableFuture[] futures = files.stream().map(f -> CompletableFuture.supplyAsync(() -> getFileChecksum(f.path), executor)
.whenComplete((checksum, err) -> {
try {
if (checksum != null) {
String checktime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
context.write(new Text(f.path.toString()),
new Text(checksum.toString()+ "\t"
+ checktime + "\t"
+ f.db + "\t"
+ f.tbl
)); //格式: path checksum time db tbl
}
} catch (IOException | InterruptedException e1) {
e1.printStackTrace();
}
})).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join(); //等待完成
executor.shutdown();
long time = (System.currentTimeMillis() - start)/1000;
if(size > 500 || time > 200){
System.out.println("目录:" + v + ",file_list长度:" + size +",线程数:"+threadNum +
",耗时:"+ time + "s");
}
}
private String[] convertToHdfsPath(String path){
if(path.startsWith("har://")){
return path.replace("har://", "").replaceAll(".har/.*?(?=\t)", ".har").split("\t");
}
return path.split("\t");
}
private FileChecksum getFileChecksum(Path path){
try {
//path可能会有difed变成真实的schema,所以不能用全局的fs
FileSystem fileSystem = path.getFileSystem(conf);
return fileSystem.getFileChecksum(path);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
class Entity{
Path path;
String db;
String tbl;
public Entity(Path path, String db, String tbl) {
this.path = path;
this.db = db;
this.tbl = tbl;
}
}
}
2. reducer
public class GenFileChecksumReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 鉴于har文件解析出的路径可能会出现多个相同的key即filePath,这里只取一个(第一个)checksum值即可
if(!values.iterator().hasNext()){ return; }
context.write(key, values.iterator().next());
}
}
3. runner
在runner中,我们指定一些map
和reduce
的参数,并按ToolRunner
的规范编写。
public class GenFileChecksumRunner extends Configured implements Tool {
private static final Logger LOGGER = LoggerFactory.getLogger(GenFileChecksumRunner.class);
// ... 一些常量
private static final String REQUIRED = "[required]";
public static final String OPTION_KEY_INPUT_PATH = "i";
public static final String OPTION_KEY_INPUT_PATH_LONG = "input";
public static final String OPTION_KEY_OUTPUT_PATH = "o";
public static final String OPTION_KEY_OUTPUT_PATH_LONG = "output";
private Options options;
private String[] args;
private String[] input; //输FileNew入如果欧多个,用逗号分隔
private String output;
private Job job;
public static void main(String[] args) throws Exception {
int retval = ToolRunner.run(new GenFileChecksumRunner(), args);
System.exit(retval);
}
private void setupJob() throws IOException{
Configuration conf = HDFSUtils.getConf();
// System.setProperty("HADOOP_USER_NAME", "xxx");
// System.setProperty("HADOOP_USER_PASSWORD", "yyy");
conf.set("mapreduce.job.priority","HIGH");
conf.set("mapreduce.job.queuename", QUEUE);
conf.set("mapreduce.job.reduces", "18");
Path outPath = new Path(output);
FileSystem fs = outPath.getFileSystem(conf);
if (fs.exists(outPath)) {
LOGGER.info("checksum job: delete output: [{}]", output);
fs.delete(outPath, true);
}
job = Job.getInstance(conf,"Generate All Core File Checksum Job");
//设置Job作业所在jar包
job.setJarByClass(GenFileChecksumRunner.class);
TextInputFormat.setMaxInputSplitSize(job, 1024*512L); //设置了最大分片,mapNum=block/split
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//设置本次作业的Mapper类和Reducer类
for (String in : input) {
MultipleInputs.addInputPath(job, new Path(in.trim()),
TextInputFormat.class, GenFileChecksumMapper.class);
}
job.setReducerClass(GenFileChecksumReducer.class);
//设置Mapper类的输出key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer类的输出key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定本次作业产生的结果输出路径
FileOutputFormat.setOutputPath(job, new Path(output));
}
@Override
public int run(String[] args) throws Exception {
if (args == null) {
String errMsg = "run method was passed a null String[] args.";
LOGGER.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
this.args = args;
parseInputArgs();
LOGGER.info("Invoking GenerateChecksumRunner.run with args\n"
+ "input :{}\n"
+ "output :{}\n",
input, output, output);
setupJob();
job.submit();
return job.waitForCompletion(true)? 0: 1;
}
private void parseInputArgs() throws IllegalArgumentException {
// Build our command line options
@SuppressWarnings("static-access")
Option inputOpt = OptionBuilder.withLongOpt(OPTION_KEY_INPUT_PATH_LONG)
.withDescription(REQUIRED + " input file path on HDFS,多个则用逗号分隔")
.isRequired(true)
.hasArgs(1)
.create(OPTION_KEY_INPUT_PATH);
@SuppressWarnings("static-access")
Option outputOpt = OptionBuilder.withLongOpt(OPTION_KEY_OUTPUT_PATH_LONG)
.withDescription(REQUIRED + " output path on HDFS")
.isRequired(true)
.hasArgs(1)
.create(OPTION_KEY_OUTPUT_PATH);
options = new Options();
options.addOption(inputOpt);
options.addOption(outputOpt);
// Create the parser and parse the String[] args
CommandLineParser parser = new BasicParser();
CommandLine commandLine = null;
try {
commandLine = parser.parse(options, args);
input = commandLine.getOptionValue(OPTION_KEY_INPUT_PATH).split(",");
LOGGER.info("Cli arg: {} = {}",
OPTION_KEY_INPUT_PATH_LONG, input);
output = commandLine.getOptionValue(OPTION_KEY_OUTPUT_PATH);
LOGGER.info("Cli arg: {} = {}",
OPTION_KEY_OUTPUT_PATH_LONG, output);
}catch (ParseException e) {
String errMsg = "Unable to parse command line properties, e = " + e.toString();
LOGGER.error(errMsg);
printUsage(false);
throw new IllegalArgumentException(errMsg);
}
validateArg(input, OPTION_KEY_INPUT_PATH_LONG);
validateArg(output, OPTION_KEY_OUTPUT_PATH_LONG);
}
private void printUsage(boolean exit) {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("generateChecksum", options);
if (exit) {
System.exit(0);
}
}
private void validateArg(String arg, String argName)
throws IllegalArgumentException
{
if (arg == null || arg.length() == 0) {
String errMsg = argName + " argument was either null or empty";
LOGGER.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
}
private void validateArg(String[] arg, String argName)
throws IllegalArgumentException
{
if (arg == null || arg.length == 0) {
String errMsg = argName + " argument was either null or empty";
LOGGER.error(errMsg);
throw new IllegalArgumentException(errMsg);
}
}
}
运行
使用hadoop jar运行即可。
hadoop jar xxx-analysis-1.0-SNAPSHOT-jar-with-dependencies.jar com.xxx.xxx.analysis.job.checksum.GenFileChecksumRunner -i /user/xxx/test-copy/whole-backup/partition_20190819_all.txt -o /user/xxx/test-copy/whole-backup/out-all > wholeall.log 2>&1 &