您的位置:首页 > 编程语言 > Java开发

Graph java 实现最小生成树

2017-09-02 13:39 856 查看
算法简单描述为

1:创建一个空图。

2:把边最短的边加到图中。在选择最短边时,已经加入的不能加,边src和dst边是一个component的不能加。加入到图中的边需要打上标记。

3:循环2

代码如下

import java.io.Serializable;

import java.util.Arrays;

import java.util.Comparator;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import org.apache.spark.graphx.Edge;

import org.apache.spark.graphx.EdgeTriplet;

import org.apache.spark.graphx.Graph;

import org.apache.spark.graphx.Graph$;

import org.apache.spark.graphx.VertexRDD;

import org.apache.spark.rdd.RDD;

import org.apache.spark.storage.StorageLevel;

import com.google.common.base.Optional;

import scala.Function1;

import scala.Option;

import scala.Tuple2;

import scala.reflect.ClassManifestFactory;

import scala.reflect.ClassTag;

import scala.runtime.AbstractFunction0;

import scala.runtime.AbstractFunction1;

import scala.runtime.AbstractFunction2;

import scala.runtime.AbstractFunction3;

import scala.runtime.BoxedUnit;

/**

 * @param <T>

 * 

 */

public class MiniTreeTest

{

private static final ClassTag<Integer> tagInteger = ClassManifestFactory.classType( Integer.class );
private static final ClassTag<String> tagString = ClassManifestFactory.classType( String.class );
private static final ClassTag<Object> tagObject = ClassManifestFactory.classType( Object.class );
private static final ClassTag<Double> tagDouble = ClassManifestFactory.classType( Double.class );
private static final ClassTag<Tuple2<Object, Double>> tagTuple2 = ClassManifestFactory.classType( Tuple2.class );
private static final ClassTag<Tuple2<Boolean, Double>> tagTuple2Boolean = ClassManifestFactory.classType( Tuple2.class );

public static void main( String[] args )
{
SparkConf conf = new SparkConf( ).setAppName( "Graph short path" ).setMaster( "local" );
JavaSparkContext ctx = new JavaSparkContext( conf );

JavaRDD<Tuple2<Object, String>> verts = ctx.parallelize( Arrays.asList( 
new Tuple2<Object, String>( 1L, "a" ),
new Tuple2<Object, String>( 2L, "b" ), 
new Tuple2<Object, String>( 3L, "c" ), 
new Tuple2<Object, String>( 4L, "d" ), 
new Tuple2<Object, String>( 5L, "e" ) ) );

JavaRDD<Edge<Double>> edges = ctx.parallelize( Arrays.asList( 
new Edge<Double>( 1L, 2L, 10.0 ), 
new Edge<Double>( 2L, 3L, 20.0 ),
new Edge<Double>( 2L, 4L, 30.0 ), 
new Edge<Double>( 4L, 5L, 80.0 ), 
new Edge<Double>( 1L, 5L, 3.0 ), 
new Edge<Double>( 1L, 4L, 30.0 ) ) );

Graph<String, Double> g = Graph.apply( verts.rdd( ), edges.rdd( ), "aa", StorageLevel.MEMORY_ONLY( ), StorageLevel.MEMORY_ONLY( ), tagString, tagDouble );

Graph<String, Double> retGraph = minSpanningTree( g );

retGraph.edges( ).foreach( new MyFuction1<Edge<Double>, BoxedUnit>( )
{

@Override
pub
4000
lic BoxedUnit apply( Edge<Double> t )
{
System.out.println( t );
return BoxedUnit.UNIT;
}
} );

ctx.stop( );
}

private static <VD, ED> Graph<Object, ED>
caleConnectedComponent( Graph<VD, ED> g )
{
return Graph$.MODULE$.graphToGraphOps( g, ClassManifestFactory.classType( Object.class ), ClassManifestFactory.classType( Object.class ) ).connectedComponents( );
}

private static <VD> Graph<VD, Double> minSpanningTree( Graph<VD, Double> g )
{
Graph<VD, Tuple2<Double, Boolean>> g2 = g.mapEdges( new MyFuction1<Edge<Double>, Tuple2<Double, Boolean>>( )
{

@Override
public Tuple2<Double, Boolean> apply( Edge<Double> t )
{
return new Tuple2<Double, Boolean>( t.attr, false );
}
}, ClassManifestFactory.classType( Tuple2.class ) );

Graph<Tuple2<VD, Option<Object>>, Tuple2<Double, Boolean>> g3 = null;
long count = g.vertices( ).count( );
for ( long i = 1; i <= count; i++ )
{
Graph<VD, Tuple2<Double, Boolean>> subG2 = g2.subgraph( new MyFuction1<EdgeTriplet<VD, Tuple2<Double, Boolean>>, Object>( )
{

@Override
public Object
apply( EdgeTriplet<VD, Tuple2<Double, Boolean>> t )
{
return t.attr._2( );
}
}, new MyFuction2<Object, VD, Object>( )
{

@Override
public Object apply( Object arg0, VD arg1 )
{
return true;
}
} );
g3 = g2.outerJoinVertices( caleConnectedComponent( subG2 ).vertices( ), new MyFuction3<Object, VD, Option<Object>, Tuple2<VD, Option<Object>>>( )
{

@Override
public Tuple2<VD, Option<Object>> apply( Object t0, VD t1,
Option<Object> t2 )
{

return new Tuple2<VD, Option<Object>>( t1, t2 );
}

}, ClassManifestFactory.classType( Tuple2.class ), ClassManifestFactory.classType( Tuple2.class ), null );

g3 = g3.subgraph( new MyFuction1<EdgeTriplet<Tuple2<VD, Option<Object>>, Tuple2<Double, Boolean>>, Object>( )
{

@Override
public Object apply(
EdgeTriplet<Tuple2<VD, Option<Object>>, Tuple2<Double, Boolean>> t )
{

return t.srcAttr( )._2( ).getOrElse( new MyFuction0<Object, Object>( )
{

@Override
public Object apply( )
{
return -1;
}
} ).equals( t.dstAttr( )._2( ).getOrElse( new MyFuction0<Object, Object>( )
{

@Override
public Object apply( )
{
return -2;
}
} ) );
}
}, new MyFuction2<Object, Tuple2<VD, Option<Object>>, Object>( )
{

@Override
public Object apply( Object arg0,
Tuple2<VD, Option<Object>> arg1 )
{
return true;
}
} );

RDD<Tuple2<Tuple2<Object, Object>, Tuple2<Double, Boolean>>> unaliableEdges = g3.edges( ).map( new MyFuction1<Edge<Tuple2<Double, Boolean>>, Tuple2<Tuple2<Object, Object>, Tuple2<Double, Boolean>>>( )
{

@Override
public Tuple2<Tuple2<Object, Object>, Tuple2<Double, Boolean>>
apply( Edge<Tuple2<Double, Boolean>> t )
{
return new Tuple2<Tuple2<Object, Object>, Tuple2<Double, Boolean>>( new Tuple2<Object, Object>( t.srcId( ), t.dstId( ) ), t.attr );
}
}, ClassManifestFactory.classType( Tuple2.class ) );

RDD<Tuple2<Tuple2<Object, Object>, Double>> caleEdges = toJavaPairRDD( g2.edges( ).map( new MyFuction1<Edge<Tuple2<Double, Boolean>>, Tuple2<Tuple2<Object, Object>, Tuple2<Double, Boolean>>>( )
{

@Override
public Tuple2<Tuple2<Object, Object>, Tuple2<Double, Boolean>>
apply( Edge<Tuple2<Double, Boolean>> t )
{
return new Tuple2<Tuple2<Object, Object>, Tuple2<Double, Boolean>>( new Tuple2<Object, Object>( t.srcId( ), t.dstId( ) ), t.attr );
}
}, ClassManifestFactory.classType( Tuple2.class ) ) ).leftOuterJoin( toJavaPairRDD( unaliableEdges ) ).rdd( ).filter( new MyFuction1<Tuple2<Tuple2<Object, Object>, Tuple2<Tuple2<Double, Boolean>, Optional<Tuple2<Double,
Boolean>>>>, Object>( )
{

@Override
public Object apply(
Tuple2<Tuple2<Object, Object>, Tuple2<Tuple2<Double, Boolean>, Optional<Tuple2<Double, Boolean>>>> t )
{
return !t._2( )._1( )._2( )
&& !t._2( )._2( ).isPresent( ) ;
}
} ).map( new MyFuction1<Tuple2<Tuple2<Object, Object>, Tuple2<Tuple2<Double, Boolean>, Optional<Tuple2<Double, Boolean>>>>, Tuple2<Tuple2<Object, Object>, Double>>( )
{

@Override
public Tuple2<Tuple2<Object, Object>, Double> apply(
Tuple2<Tuple2<Object, Object>, Tuple2<Tuple2<Double, Boolean>, Optional<Tuple2<Double, Boolean>>>> t )
{
return new Tuple2<Tuple2<Object, Object>, Double>( t._1( ), t._2( )._1( )._1( ) );
}
}, ClassManifestFactory.classType( Tuple2.class ) );

JavaPairRDD<Tuple2<Object, Object>, Double> javaCaleEdges = toJavaPairRDD( caleEdges );
Tuple2<Tuple2<Object, Object>, Double> smallEdge;
if (javaCaleEdges.count( ) == 0)
{
smallEdge = new Tuple2<Tuple2<Object, Object>, Double>(new Tuple2<Object, Object>(-1L, -1L), -1.0);
}
else
{
smallEdge = javaCaleEdges.min( new MyComparator<Tuple2<Tuple2<Object, Object>, Double>>( )
{

@Override
public int compare( Tuple2<Tuple2<Object, Object>, Double> t1,
Tuple2<Tuple2<Object, Object>, Double> t2 )
{
if ( t1._2.equals( t2._2( ) ) )
{
return (long) t1._1( )._1( ) - (long) t2._1( )._1( ) > 0
? 1 : -1;
}
else
{
return t1._2( ) - t2._2( ) > 0 ? 1 : -1;
}
}
} );
}

g2 = g2.mapTriplets( new MyFuction1<EdgeTriplet<VD, Tuple2<Double, Boolean>>, Tuple2<Double, Boolean>>( )
{

@Override
public Tuple2<Double, Boolean>
apply( EdgeTriplet<VD, Tuple2<Double, Boolean>> t )
{

return new Tuple2<Double, Boolean>( t.attr._1, t.attr._2( )
|| ( smallEdge._1( )._1( ).equals( t.srcId( ) )
&& smallEdge._1( )._2( ).equals( t.dstId( ) ) ) );
}
}, ClassManifestFactory.classType( Tuple2.class ) );
}

Graph<VD, Double> retGraph = g2.subgraph( new MyFuction1<EdgeTriplet<VD, Tuple2<Double, Boolean>>, Object>( )
{

@Override
public Object apply( EdgeTriplet<VD, Tuple2<Double, Boolean>> t )
{
return t.attr._2( );
}
}, new MyFuction2<Object, VD, Object>( )
{

@Override
public Object apply( Object arg0, VD arg1 )
{
return true;
}
} ).mapEdges( new MyFuction1<Edge<Tuple2<Double, Boolean>>, Double>( )
{

@Override
public Double apply( Edge<Tuple2<Double, Boolean>> t )
{
return t.attr( )._1( );
}
}, tagDouble );

return retGraph;
}

private static <T> JavaPairRDD<Tuple2<Object, Object>, T>
toJavaPairRDD( RDD<Tuple2<Tuple2<Object, Object>, T>> v )
{
return new JavaPairRDD<Tuple2<Object, Object>, T>( v, ClassManifestFactory.classType( Tuple2.class ), ClassManifestFactory.classType( Object.class ) );
}

private static <T> JavaRDD<T> toJavaRDD( RDD<T> v )
{
return new JavaRDD<T>( v, ClassManifestFactory.classType( Object.class ) );
}

private static <T> JavaPairRDD<Object, T> toJavaPair( VertexRDD<T> v,
ClassTag<T> tagT )
{
return new JavaPairRDD<Object, T>( (RDD<Tuple2<Object, T>>) v, tagObject, tagT );
}

private static <T> JavaRDD<Tuple2<Object, T>> toJavaPair( VertexRDD<T> v )
{
return new JavaRDD<Tuple2<Object, T>>( (RDD<Tuple2<Object, T>>) v, ClassManifestFactory.classType( Tuple2.class ) );
}

public static abstract class MyFuction2<T1, T2, R> extends AbstractFunction2<T1, T2, R> implements Serializable
{

}

public static abstract class MyComparator<T> implements Comparator<T>, Serializable
{

}

public static abstract class MyFuction3<T1, T2, T3, R> extends AbstractFunction3<T1, T2, T3, R> implements Serializable
{

}

public static abstract class MyFuction0<T1, R> extends AbstractFunction0<R> implements Serializable
{

}

public static abstract class MyFuction1<T1, R> extends AbstractFunction1<T1, R> implements Serializable
{

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: