之前一直妄图利用笔记本写并行程序跑,后来发现实在太慢了。。。具体思路就是把原始的结果的key作为id, 值作为边读进去,然后启动时加载对比结果文件,并将对比结果文件存入一个treemap,然后顶点跑的时候比对即可,非常快,20秒就有结果了
package org.apache.giraph.examples;
import java.io.IOException;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
public class ResultCompareComputation extends BasicComputation<IntWritable, IntWritable,
NullWritable, IntWritable>{
@Override
public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
Iterable<IntWritable> messages) throws IOException {
if(getSuperstep() == 0) {
Integer sourceValue = Integer.MIN_VALUE ;
for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
sourceValue = edge.getTargetVertexId().get() ;
}
Integer improveValue = this.vertices.get(new Integer(vertex.getId().get()));
// GraphTaskManager.LOG.info(vertex.toString() + ", improveValue " + improveValue) ;
if(!sourceValue.equals(improveValue))
vertex.setValue(new IntWritable(-1));
}
vertex.voteToHalt();
}
}
package org.apache.giraph.partition;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.TreeMap;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class ResultComparisionReader implements Runnable {
String path ;
public TreeMap<Integer,Integer> vertices = new TreeMap<Integer,Integer>() ;
public ResultComparisionReader(ImmutableClassesGiraphConfiguration configuration)
{
path = configuration.get("giraph.result.compare") ;
}
public TreeMap<Integer,Integer> loadResultFile() {
try {
Configuration conf = new Configuration();
FileSystem hdfs;
hdfs = FileSystem.get(URI.create(path), conf);
FSDataInputStream fp = hdfs.open(new Path(path)) ;
InputStreamReader isr = new InputStreamReader(fp) ;
BufferedReader reader = new BufferedReader(isr) ;
String tempString = null ;
while((tempString = reader.readLine()) != null){
if (tempString != null && !"".equals(tempString)) {
String[] keyValue = tempString.split("\t");
Integer key = Integer.parseInt(keyValue[0]);
Integer value = Integer.parseInt(keyValue[1]);
vertices.put(key, value) ;
}
}
isr.close() ;
reader.close() ;
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return vertices;
}
@Override
public void run() {
}
}
这个是可以达到要求的。。。