测试文件:用Graph rmatGraph 1000000 2000000 去重后 494587个点,1997743个边
运行环境:246 GB,core 71.
测试三个运行例子1:Graph connectedComponents 2:GraphFrame connectedComponents 3:GraphFrame connectedComponents setAlgorithm( "graphx" )
运行结果
1和2 差不多,都在2.6和2.7分钟左右。
3最快 1.3分钟。
1和3 的job是一样的都是10个,2的job是28个。
提交到yarn 上的命令是
spark-submit --class blost.ConnectedComponentTest --master yarn --deploy-mode cluster --num-executors 10 --executor-cores 2 --driver-memory 2g --executor-memory 2g a.jar
后面加 1或者2或者3 启动三个计算。
做了测试后,为什1和3用的事件不一样,理论上应该一样的。看了运行监控,对1做了修改
map data 成BoxedUnit 这样 数据少了 StorageLevel.改成StorageLevel.MEMORY_AND_DISK。
运行事件变为1.1分钟,和3差不多。
代码如下
private static void graphTest(JavaSparkContext ctx, String vPath, String ePath)
{
JavaRDD<Tuple2<Object, BoxedUnit>> verticeRDD = ctx.textFile( vPath ).map( s->{
String[] strs = pattern.split( s );
return new Tuple2<Object, BoxedUnit>(Long.parseLong( strs[0] ), BoxedUnit.UNIT);
});
JavaRDD<Edge<BoxedUnit>> edgeRDD = ctx.textFile( ePath ).map( s->
{
String[] strs = pattern.split( s );
return new Edge<BoxedUnit>(Long.parseLong( strs[0] ), Long.parseLong( strs[1]), BoxedUnit.UNIT);
});
Graph<BoxedUnit, BoxedUnit> g = Graph.apply( verticeRDD.rdd( ), edgeRDD.rdd( ), BoxedUnit.UNIT,
StorageLevel.MEMORY_AND_DISK( ), StorageLevel.MEMORY_AND_DISK(),
ClassManifestFactory.classType( BoxedUnit.class ),ClassManifestFactory.classType( BoxedUnit.class ) );
g.ops( ).connectedComponents( ).vertices( ).count( );
}
另外,2需要chenkpoint,用了100多个block。
测试代码如下
package blost;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.graphx.Edge;
import org.apache.spark.graphx.Graph;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import org.graphframes.GraphFrame;
import scala.Tuple2;
import scala.reflect.ClassManifestFactory;
import scala.reflect.ClassTag;
/**
*
*/
public class ConnectedComponentTest
{
private static final Pattern pattern = Pattern.compile( "\\s+" );
private static final ClassTag<String> tagString = ClassManifestFactory.classType( String.class );
private static final ClassTag<Double> tagDouble = ClassManifestFactory.classType( Double.class );
public static void main( String[] args )
{
if (args.length == 0)
{
System.exit( 0 );
}
SparkConf sparkConf = new SparkConf().setAppName( "write graph data" + args[0]);
JavaSparkContext ctx = new JavaSparkContext( sparkConf );
String vPath = "hdfs://...";
String ePath = "hdfs://...";
if ("1".equals( args[0] ))
{
graphTest(ctx, vPath, ePath);
}
else if ("2".equals( args[0] ))
{
graphFrameTest( ctx, vPath, ePath );
}
else if ("3".equals( args[0] ))
{
graphFrameWithGraphXTest( ctx, vPath, ePath );
}
ctx.stop( );
}
private static void graphTest(JavaSparkContext ctx, String vPath, String ePath)
{
JavaRDD<Tuple2<Object, String>> verticeRDD = ctx.textFile( vPath ).map( s->{
String[] strs = pattern.split( s );
return new Tuple2<Object, String>(Long.parseLong( strs[0] ), strs[1]);
});
JavaRDD<Edge<Double>> edgeRDD = ctx.textFile( ePath ).map( s->
{
String[] strs = pattern.split( s );
return new Edge<Double>(Long.parseLong( strs[0] ), Long.parseLong( strs[1]), Double.parseDouble( strs[2]) );
});
Graph<String, Double> g = Graph.apply( verticeRDD.rdd( ), edgeRDD.rdd( ), "",
StorageLevel.MEMORY_AND_DISK_SER_2( ), StorageLevel.MEMORY_AND_DISK_SER_2( ),
tagString,tagDouble );
Graph.graphToGraphOps( g, tagString,tagDouble ).connectedComponents( ).vertices( ).toJavaRDD( ).collect( );
}
private static void graphFrameTest(JavaSparkContext ctx, String vPath, String ePath)
{
ctx.setCheckpointDir( "hdfs://...");
createGraphFrame( ctx, vPath, ePath ).connectedComponents( ).run( ).collect( );
}
private static void graphFrameWithGraphXTest(JavaSparkContext ctx, String vPath, String ePath)
{
createGraphFrame( ctx, vPath, ePath ).connectedComponents( ).setAlgorithm( "graphx" ).run( ).collect( );
}
private static GraphFrame createGraphFrame(JavaSparkContext ctx, String vPath, String ePath)
{
SQLContext sqlCtx = SQLContext.getOrCreate( ctx.sc( ) );
JavaRDD<Row> vertices = ctx.textFile( vPath ).map( s->{
String[] strs = pattern.split( s );
return RowFactory.create( Long.parseLong( strs[0] ), strs[1]);
});
JavaRDD<Row> edges = ctx.textFile( ePath ).map( s->
{
String[] strs = pattern.split( s );
return RowFactory.create(Long.parseLong( strs[0] ), Long.parseLong( strs[1]), Double.parseDouble( strs[2]) );
});
GraphFrame frame = new GraphFrame(sqlCtx.createDataFrame( vertices, createVType( ) ),
sqlCtx.createDataFrame( edges, createEType( ) ));
return frame;
}
private static StructType createVType()
{
List<StructField> vList = new ArrayList<StructField>();
vList.add( DataTypes.createStructField( "id", DataTypes.LongType, false ) );
vList.add( DataTypes.createStructField( "name", DataTypes.StringType, true ) );
return DataTypes.createStructType( vList );
}
private static StructType createEType()
{
List<StructField> eList = new ArrayList<StructField>();
eList.add( DataTypes.createStructField( "src", DataTypes.LongType, false ) );
eList.add( DataTypes.createStructField( "dst", DataTypes.LongType, false ) );
eList.add( DataTypes.createStructField( "weight", DataTypes.DoubleType, false ) );
return DataTypes.createStructType( eList );
}
}