话不多说,直接上效果图。
可以看到代码是在集群上运行的而不是在本地。
而且我所说的自动不是其他博客说的那样还要自己手动打jar包然后在eclipse中通过代码传到集群上,而是在eclipse上运行这段代码会自动把jar包打好然后自动上传到集群上
完全不用手!
完全不用手!
完全不用手!
非常方便!!!
示例
链接:https://pan.baidu.com/s/1KVvwsHLEI0CHdhiwcfIgww
提取码:ahzp
这是一个倒排索引的例子,data是数据,InvertIndex是mapreduce代码,EJob是自动打包工具。
代码中的一些关键点我已经备注了,按照提示设置路径就没问题。
首先在eclipse中连接hadoop,可以参照我的博客,但是按照这样运行是默认在本地运行的,运行时会显示在localhost上运行,要想在集群上运行,需要把在搭建hadoop中的配置的那些文件复制到本项目的bin目录下,像这样,然后就可以提交到集群了。可以直接拿我这个代码改一下路径尝试一下完全自动打包提交的快感。
代码展示
InvertIndex的代码(用来实现倒排索引的,很基础,所以不过多说):
package test;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
//import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapred.JobConf;
public class InvertIndex {
public static class Map extends Mapper<LongWritable,Text,Text,Text>
{
@Override
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
Text word = new Text();
// System.out.println(fileName.toString());
Text fileName_lineOffset = new Text(fileName);
StringTokenizer itr = new StringTokenizer(value.toString());
for(;itr.hasMoreTokens();){
word.set(itr.nextToken());
context.write(word, fileName_lineOffset);
}
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>
{
@Override
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException
{
Iterator<Text> it = values.iterator();
StringBuilder all = new StringBuilder();
if (it.hasNext()){
all.append(it.next().toString());
for(;it.hasNext();){
all.append(";");
// System.out.println(it.next());
all.append(it.next().toString());
}
context.write(key, new Text(all.toString()));
}
}
}
public static void main(String[] args ) throws Exception
{
// 重点
// 这部分就是自动打jar包的代码,将需要打包的项目的bin目录输入,然后jar包会输出到EJob中设置的输出目录中
File jarFile = EJob.createTempJar("/root/workspace/Myproject/bin");
System.out.println(jarFile);
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
// 重点
Configuration conf = new Configuration(true);
String[] ars=new String[]{"hdfs://192.168.75.128:9000/data/","hdfs://192.168.75.128:9000/output/test"};
String[] otherArgs=new GenericOptionsParser(conf,ars).getRemainingArgs();
@SuppressWarnings("deprecation")
Job job= Job.getInstance(conf,"invert index");
// 将打成的jar包上传到集群
((JobConf) job.getConfiguration()).setJar(jarFile.toString());
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
// job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]+ time));
System.out.println("Job Start!");
if (job.waitForCompletion(true)) {
System.out.println("ok!");
System.exit(0);
} else {
System.out.println("error!");
System.exit(-1);
}
}
}
EJob的代码(这是个大佬写的,真的非常好用,大体思路就是创建一个jar包,然后读取bin目录下文件将其写入jar包中,然后直接将该jar包提交到集群上,实现完全自动打包提交。代码中具体要改的我已经备注上去了,按照备注改就可以正常运行了):
package test;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
public class EJob {
private static List<URL> classPath = new ArrayList<URL>();
// 这里的输入root是你要打jar包项目的bin目录
public static File createTempJar(String root) throws IOException {
// System.out.println("bbbb");
if (!new File(root).exists()) {
return null;
}
// System.out.println("aaaa");
Manifest manifest = new Manifest();
manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
//final File jarFile = File.createTempFile("EJob-", ".jar", new File(System.getProperty("java.io.tmpdir")));
// 这个是你打成的jar包输出的目录
final File jarFile = File.createTempFile("EJob-", ".jar",new File("/root/workspace/Myproject/bin/zf/"));
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
jarFile.delete();
}
});
JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile), manifest);
createTempJarInner(out, new File(root), "");
out.flush();
out.close();
return jarFile;
}
private static void createTempJarInner(JarOutputStream out, File f,String base) throws IOException{
if (f.isDirectory()) {
File[] fl = f.listFiles();
if (base.length() > 0) {
base = base + "/";
}
for (int i = 0; i < fl.length; i++) {
createTempJarInner(out, fl[i], base + fl[i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
public static ClassLoader getClassLoader() {
ClassLoader parent = Thread.currentThread().getContextClassLoader();
if (parent == null){
parent = EJob.class.getClassLoader();
}
if (parent == null) {
parent = ClassLoader.getSystemClassLoader();
}
return new URLClassLoader(classPath.toArray(new URL[0]), parent);
}
public static void addClasspath(String component) {
if ((component != null) && (component.length() > 0)) {
try {
File f = new File(component);
if (f.exists()) {
//URL key = f.getCanonicalFile().toURL();
URL key = f.getCanonicalFile().toURI().toURL();
if (!classPath.contains(key)) {
classPath.add(key);
}
}
}catch (IOException e) {
}
}
}
}