有时候需要对大图进行裁剪再做进一步处理,其中一个应用场景,给定一批种子顶点,提取其中的N度范围内的邻居顶点构成的子图,使用Pregel可以实现。
思路: 将种子顶点属性值设置为1,非种子顶点属性设置为0,通过Pregel 迭代,每一次将顶点的属性值传递给邻居节点,每个顶点接收到属性值后与自己的属性值进行累加,第一次可以传递给一度的邻居顶点,第二次传递可以传递给二度邻居节点(一度邻居节点会重复接收,但不影响结果),以此类推,通过N次迭代,第N度的邻居节点能够收到种子节点的值。最终筛选顶点属性值大于0的顶点构成的子图即为裁剪后的目标子图。
代码如下:
import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object GraphXExample { def main(args: Array[String]) { var maxIterations=2 var spark = SparkSession .builder .appName("GraphExample") .master("local[4]") .getOrCreate() val sc = spark.sparkContext val vertexArray: Array[(Long, Int)] = Array( (1L, 0), (2L, 1), (3L, 1), (4L, 1), (5L, 0), (6L, 0), (7L, 0), (8L, 0), (9L, 0), (10L, 0), (11L, 0), (12L, 0), (13L, 0) ) val edgeArray: Array[Edge[Int]] = Array( Edge(1L, 2L, 1), Edge(2L, 3L, 1), Edge(3L, 4L, 1), Edge(4L, 5L, 1), Edge(5L, 6L, 1), Edge(6L, 7L, 1), Edge(7L, 8L, 1), Edge(8L, 9L, 1), Edge(9L, 10L, 1), Edge(2L, 11L, 1), Edge(4L, 12L, 1), Edge(12L, 13L, 1) ) val vertexRDD: RDD[(Long, Int)] = sc.parallelize(vertexArray) val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray) var graph: Graph[Int, Int] = Graph(vertexRDD, edgeRDD) def sendMessage(edge: EdgeTriplet[Int, Int]) = { Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)) } def messageCombiner(a: Int, b: Int): Int = a + b val initialMessage = 0 def vprog(vid: VertexId, vdata: Int, msg: Int): Int = { vdata + msg } var newGraph = Pregel(graph, initialMessage, maxIterations)(vprog, sendMessage, messageCombiner) newGraph.vertices.filter{ case(vid,vdata)=>{ vdata>0 } }.sortByKey().collect().foreach(println(_)) spark.stop() } }
结果如下:
(1,3) (2,7) (3,7) (4,7) (5,3) (6,1) (11,3) (12,3) (13,1)