本文采用mahout0.9版本,由于该版本天然支持hadoop1.x所以代码编写调用taste算法比较容易,但是hadoop2.x就不可以了
步骤:
1、将hadoop1.x安装目录下的三个配置文件拷贝到java项目的classpath根目录下,分别是core-site.xml,hdfs-site.xml和mapred-site.xml。
2、编写访问hdfs的dao类
代码如下:
public class HdfsDAO {
private static final String HDFS = "hdfs://192.168.1.210:9000/";
public HdfsDAO(Configuration conf) {
this(HDFS, conf);
}
public HdfsDAO(String hdfs, Configuration conf) {
this.hdfsPath = hdfs;
this.conf = conf;
}
private String hdfsPath;
private Configuration conf;
public static void main(String[] args) throws IOException {
JobConf conf = config();
HdfsDAO hdfs = new HdfsDAO(conf);
hdfs.copyFile("datafile/item.csv", "/tmp/new");
hdfs.ls("/tmp/new");
}
public static JobConf config(){
JobConf conf = new JobConf(HdfsDAO.class);
conf.setJobName("HdfsDAO");
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
return conf;
}
public void mkdirs(String folder) throws IOException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
if (!fs.exists(path)) {
fs.mkdirs(path);
System.out.println("Create: " + folder);
}
fs.close();
}
public void rmr(String folder) throws IOException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.deleteOnExit(path);
System.out.println("Delete: " + folder);
fs.close();
}
public void ls(String folder) throws IOException {
Path path = new Path(folder);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
FileStatus[] list = fs.listStatus(path);
System.out.println("ls: " + folder);
System.out.println("==========================================================");
for (FileStatus f : list) {
System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
}
System.out.println("==========================================================");
fs.close();
}
public void createFile(String file, String content) throws IOException {
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
byte[] buff = content.getBytes();
FSDataOutputStream os = null;
try {
os = fs.create(new Path(file));
os.write(buff, 0, buff.length);
System.out.println("Create: " + file);
} finally {
if (os != null)
os.close();
}
fs.close();
}
public void copyFile(String local, String remote) throws IOException {
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.copyFromLocalFile(new Path(local), new Path(remote));
System.out.println("copy from: " + local + " to " + remote);
fs.close();
}
public void download(String remote, String local) throws IOException {
Path path = new Path(remote);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.copyToLocalFile(path, new Path(local));
System.out.println("download: from" + remote + " to " + local);
fs.close();
}
public void cat(String remoteFile) throws IOException {
Path path = new Path(remoteFile);
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
FSDataInputStream fsdis = null;
System.out.println("cat: " + remoteFile);
try {
fsdis =fs.open(path);
IOUtils.copyBytes(fsdis, System.out, 4096, false);
} finally {
IOUtils.closeStream(fsdis);
fs.close();
}
}
public void location() throws IOException {
// String folder = hdfsPath + "create/";
// String file = "t2.txt";
// FileSystem fs = FileSystem.get(URI.create(hdfsPath), new
// Configuration());
// FileStatus f = fs.getFileStatus(new Path(folder + file));
// BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());
//
// System.out.println("File Location: " + folder + file);
// for (BlockLocation bl : list) {
// String[] hosts = bl.getHosts();
// for (String host : hosts) {
// System.out.println("host:" + host);
// }
// }
// fs.close();
}
}
3、编写调用mahout的基于item的过滤算法代码
public class ItemCFHadoop1x {
private static final String HDFS = "hdfs://192.168.1.210:9000";
public static void main(String[] args) throws Exception {
String localFile = "datafile/item.csv";
String inPath = HDFS + "/user/hdfs/userCF";
String inFile = inPath + "/item.csv";
String outPath = HDFS + "/user/hdfs/userCF/result/";
String outFile = outPath + "/part-r-00000";
String tmpPath = HDFS + "/tmp/" + System.currentTimeMillis();
JobConf conf = config();
HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
hdfs.rmr(inPath);
hdfs.mkdirs(inPath);
hdfs.copyFile(localFile, inPath);
hdfs.ls(inPath);
hdfs.cat(inFile);
StringBuilder sb = new StringBuilder();
sb.append("--input ").append(inPath);
sb.append(" --output ").append(outPath);
sb.append(" --booleanData true");
sb.append(" --similarityClassname org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.EuclideanDistanceSimilarity");
sb.append(" --tempDir ").append(tmpPath);
args = sb.toString().split(" ");
RecommenderJob job = new RecommenderJob();
job.setConf(conf);
job.run(args);
hdfs.cat(outFile);
}
public static JobConf config() {
JobConf conf = new JobConf(ItemCFHadoop1x.class);
conf.setJobName("ItemCFHadoop");
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
return conf;
}
}
4、将数据文件放在本地项目的根目录下,data.csv,内容如下:
1 101 5
1 102 3
1 103 2.5
2 101 2
2 102 2.5
2 103 5
2 104 2
3 101 2.5
3 104 4
3 105 4.5
3 107 5
4 101 5
4 103 3
4 104 4.5
4 106 4
5 101 4
5 102 3
5 103 2
5 104 4
5 105 3.5
5 106 4
至此,结束。