利用Giraph进行快速并行结果比对

之前一直妄图利用笔记本写并行程序跑,后来发现实在太慢了。。。具体思路就是把原始的结果的key作为id, 值作为边读进去,然后启动时加载对比结果文件,并将对比结果文件存入一个treemap,然后顶点跑的时候比对即可,非常快,20秒就有结果了

package org.apache.giraph.examples;

import java.io.IOException;

import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;

public class ResultCompareComputation extends BasicComputation<IntWritable, IntWritable,
NullWritable, IntWritable>{
	
	  @Override
	  public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
	      Iterable<IntWritable> messages) throws IOException {
		  if(getSuperstep() == 0) {
			  Integer sourceValue = Integer.MIN_VALUE ;
			  for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
				  sourceValue = edge.getTargetVertexId().get() ;
			  }
			  Integer improveValue = this.vertices.get(new Integer(vertex.getId().get()));
//			  GraphTaskManager.LOG.info(vertex.toString() + ", improveValue " + improveValue) ;
			  if(!sourceValue.equals(improveValue))		  
				  vertex.setValue(new IntWritable(-1));
		  }  
		  vertex.voteToHalt();
	  }
}
package org.apache.giraph.partition;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.TreeMap;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class ResultComparisionReader implements Runnable {
	String path ;
	public TreeMap<Integer,Integer> vertices = new TreeMap<Integer,Integer>()  ;
	 
	public ResultComparisionReader(ImmutableClassesGiraphConfiguration configuration)
	{
		path = configuration.get("giraph.result.compare") ;
	}
	
	public TreeMap<Integer,Integer>  loadResultFile() {
		try {
			Configuration conf = new Configuration();  
			FileSystem hdfs;
			hdfs = FileSystem.get(URI.create(path), conf);
			FSDataInputStream fp = hdfs.open(new Path(path)) ;  
	        InputStreamReader isr = new InputStreamReader(fp) ; 
	        BufferedReader reader = new BufferedReader(isr) ;  
	       
	        String tempString = null ;
	        while((tempString = reader.readLine()) != null){  
	        	if (tempString != null && !"".equals(tempString)) {
	        		String[] keyValue =  tempString.split("\t");
					Integer key = Integer.parseInt(keyValue[0]);
					Integer value = Integer.parseInt(keyValue[1]);
					vertices.put(key, value) ;
				}
	        }
	        isr.close() ;
	        reader.close() ;
		} catch (IOException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return vertices;
	}
	
	@Override
	public void run() {
	}
}

这个是可以达到要求的。。。

猜你喜欢

转载自blog.csdn.net/cloudeagle_bupt/article/details/82932673