GraphFrame 实现最小生成树

数据转换GraphFrame 效率太低了,connectedComponents的算法效率惨不忍睹,必须要设置graphx算法。

代码如下,

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.graphframes.GraphFrame;


/**
 * 
 */


public class MiniTreeGraphFrame
{
private static SQLContext sqlCtx;
private static StructType eType;
public static void main( String[] args )
{
SparkConf conf = new SparkConf( ).setAppName( "Short Paths" ).setMaster( "local" );

JavaSparkContext ctx = new JavaSparkContext( conf );

sqlCtx = SQLContext.getOrCreate( ctx.sc( ) );

List<StructField> vList = new ArrayList<StructField>( );

vList.add( DataTypes.createStructField( "id", DataTypes.LongType, false ) );
vList.add( DataTypes.createStructField( "name", DataTypes.StringType, true ) );

StructType vType = DataTypes.createStructType( vList );

List<StructField> eList = new ArrayList<StructField>();

eList.add( DataTypes.createStructField( "src", DataTypes.LongType, false ) );
eList.add( DataTypes.createStructField( "dst", DataTypes.LongType, false ) );
eList.add( DataTypes.createStructField( "weight", DataTypes.DoubleType, true ) );

eType = DataTypes.createStructType( eList );

JavaRDD<Row> verticeRow = ctx.parallelize( Arrays.asList( 
RowFactory.create( 1L,"a" ),
RowFactory.create( 2L,"b" ),
RowFactory.create( 3L,"c" ),
RowFactory.create( 4L,"d" ),
RowFactory.create( 5L,"e" )
) );
JavaRDD<Row> edgeRow = ctx.parallelize( Arrays.asList( 
RowFactory.create( 1L,2L,10.0 ),
RowFactory.create( 2L,3L,20.0 ),
RowFactory.create( 2L,4L,30.0 ),
RowFactory.create( 4L,5L,80.0 ),
RowFactory.create( 1L,5L,15.0 ),
RowFactory.create( 1L,4L,30.0 )) );

GraphFrame frame = new GraphFrame( sqlCtx.createDataFrame( verticeRow, vType ), sqlCtx.createDataFrame( edgeRow, eType ) );
GraphFrame resultFrmae = caleMiniTree( frame );
resultFrmae.edges( ).show( false );

ctx.stop( );
}

private static GraphFrame caleMiniTree(GraphFrame frame)
{
List<StructField> eList = new ArrayList<StructField>();

eList.add( DataTypes.createStructField( "src", DataTypes.LongType, false ) );
eList.add( DataTypes.createStructField( "dst", DataTypes.LongType, false ) );
eList.add( DataTypes.createStructField( "weight", DataTypes.DoubleType, true ) );
eList.add( DataTypes.createStructField( "marked", DataTypes.BooleanType, true ) );


StructType newType = DataTypes.createStructType( eList );

JavaRDD<Row> newEdges = frame.edges( ).toJavaRDD( ).map( row -> RowFactory.create( row.getLong( 0 ), row.getLong( 1 ), row.getDouble( 2 ), false ));

GraphFrame g2 = new GraphFrame(frame.vertices( ), sqlCtx.createDataFrame( newEdges, newType ));
long count = g2.vertices( ).count( );
while(count != 0)
{
count --;
GraphFrame subG2 = new GraphFrame(g2.vertices( ), g2.edges( ).filter( g2.edges( ).col( "marked" ).equalTo( true ) ));


DataFrame g2Vertice = g2.vertices();
DataFrame componentData = subG2.connectedComponents( ).setAlgorithm( "graphx" ).run( );

DataFrame joinData = g2Vertice.join(componentData , g2Vertice.col( "id" ).equalTo( componentData.col( "id" )), "leftouter" ).toDF( "id","name","id1", "name1", "component");
joinData = joinData.select( joinData.col( "id" ), joinData.col( "component" ) );

DataFrame g2Edge = g2.edges( );


DataFrame unavailableEdge = g2Edge.join( joinData.toDF( "src", "component1"), "src" );
unavailableEdge = unavailableEdge.join( joinData.toDF( "dst", "component2"), "dst");
unavailableEdge = unavailableEdge.filter( unavailableEdge.col( "component1" ).isNotNull( ).and( unavailableEdge.col( "component1" ).equalTo( unavailableEdge.col( "component2" ) ) ));

DataFrame caleMessage = g2Edge.join( unavailableEdge, g2Edge.col( "src" ).equalTo( unavailableEdge.col( "src" ) ).
and( g2Edge.col( "dst" ).equalTo( unavailableEdge.col( "dst" ) ) ),"leftouter").
toDF( "src","dst","weight","marked","dst1", "src1","weight1", "marked1","component1","component2" );

caleMessage = caleMessage.filter( caleMessage.col( "marked" ).equalTo( false ).
and(caleMessage.col( "component1" ).isNull( )).and( caleMessage.col( "component2" ).isNull( ) )).sort( caleMessage.col( "weight" ), caleMessage.col( "src" ));
if (caleMessage.count( ) == 0)
{
break;
}
Row row = caleMessage.first( );
final long smallSrc = row.getLong( 0 );
final long smallDst = row.getLong( 1 );

JavaRDD<Row> covertEdge = g2.edges( ).toJavaRDD( ).map( t->
{
boolean bool = t.getBoolean( 3 );
if (t.getLong( 0 ) == smallSrc && t.getLong( 1 ) == smallDst)
{
bool = true;
}
return RowFactory.create( t.getLong( 0 ), t.getLong( 1 ),t.getDouble( 2 ), bool );

});

g2 = new GraphFrame( g2.vertices( ), sqlCtx.createDataFrame( covertEdge, newType ));
}
DataFrame reltEdges = g2.edges( );
DataFrame resuleData = g2.edges( ).filter( reltEdges.col( "marked" ).equalTo( true ) ).select(reltEdges.col( "src" ), reltEdges.col( "dst" ), reltEdges.col( "weight" ) );

GraphFrame retValue = new GraphFrame( g2.vertices( ),resuleData );
return retValue;
}

}


猜你喜欢

转载自blog.csdn.net/hhtop112408/article/details/77851948