Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次迭代完成后,所有顶点都是InActive状态。在大同步后,收到消息的顶点会被激活,变为Active状态,然后调用顶点的compute()方法。本文的目的就是统计每次迭代过程中,参与计算的顶点数目。下面附上SSSP的compute()方法:

    @Override  
      public void compute(Iterable<DoubleWritable> messages) {  
        if (getSuperstep() == 0) {  
          setValue(new DoubleWritable(Double.MAX_VALUE));  
        }  
        double minDist = isSource() ? 0d : Double.MAX_VALUE;  
        for (DoubleWritable message : messages) {  
          minDist = Math.min(minDist, message.get());  
        }  
        if (minDist < getValue().get()) {  
          setValue(new DoubleWritable(minDist));  
          for (Edge<LongWritable, FloatWritable> edge : getEdges()) {  
            double distance = minDist + edge.getValue().get();  
            sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));  
          }  
        }  
        //把顶点置为InActive状态  
        voteToHalt();  
      }  

附:giraph中算法的终止条件是:没有活跃顶点且worker间没有消息传递。

hama-0.6.0中算法的终止条件只是:判断是否有活跃顶点。不是真正的pregel思想,半成品。

修改过程如下:

1. org.apache.giraph.partition. PartitionStats 类

添加变量和方法,用来统计每个Partition在每个超步中参与计算的顶点数目。添加的变量和方法如下:

    /** computed vertices in this partition */  
    private long computedVertexCount=0;  
      
    /** 
    * Increment the computed vertex count by one. 
    */  
    public void incrComputedVertexCount() {  
        ++ computedVertexCount;  
    }  
      
    /** 
     * @return the computedVertexCount 
     */  
    public long getComputedVertexCount() {  
        return computedVertexCount;  
    }  

修改readFields()和write()方法,每个方法追加最后一句。当每个Partition计算完成后,会把自己的computedVertexCount发送给Master,Mater再读取汇总。
@Override  
public void readFields(DataInput input) throws IOException {  
    partitionId = input.readInt();  
    vertexCount = input.readLong();  
    finishedVertexCount = input.readLong();  
    edgeCount = input.readLong();  
    messagesSentCount = input.readLong();  
    //添加下条语句  
    computedVertexCount=input.readLong();  
}  
  
@Override  
public void write(DataOutput output) throws IOException {  
    output.writeInt(partitionId);  
    output.writeLong(vertexCount);  
    output.writeLong(finishedVertexCount);  
    output.writeLong(edgeCount);  
    output.writeLong(messagesSentCount);  
    //添加下条语句  
    output.writeLong(computedVertexCount);  
} 


2. org.apache.giraph.graph. GlobalStats 类

  添加变量和方法,用来统计每个超步中参与计算的顶点总数目,包含每个Worker上的所有Partitions。

    /** computed vertices in this partition  
     *  Add by BaiSong  
     */  
     private long computedVertexCount=0;  
     /** 
     * @return the computedVertexCount 
     */  
    public long getComputedVertexCount() {  
        return computedVertexCount;  
    }  
 

修改addPartitionStats(PartitionStats partitionStats)方法,增加统计computedVertexCount功能。
/** 
  * Add the stats of a partition to the global stats. 
  * 
  * @param partitionStats Partition stats to be added. 
  */  
  public void addPartitionStats(PartitionStats partitionStats) {  
    this.vertexCount += partitionStats.getVertexCount();  
    this.finishedVertexCount += partitionStats.getFinishedVertexCount();  
    this.edgeCount += partitionStats.getEdgeCount();  
    //Add by BaiSong,添加下条语句  
    this.computedVertexCount+=partitionStats.getComputedVertexCount();  
 } 

当然为了Debug方便,也可以修改该类的toString()方法(可选),修改后的如下:
    public String toString() {  
            return "(vtx=" + vertexCount + ", computedVertexCount="  
                    + computedVertexCount + ",finVtx=" + finishedVertexCount  
                    + ",edges=" + edgeCount + ",msgCount=" + messageCount  
                    + ",haltComputation=" + haltComputation + ")";  
        }  

3. org.apache.giraph.graph. ComputeCallable<I,V,E,M>

添加统计功能。在computePartition()方法中,添加下面一句。

    if (!vertex.isHalted()) {  
            context.progress();  
            TimerContext computeOneTimerContext = computeOneTimer.time();  
            try {  
                vertex.compute(messages);  
            //添加下面一句,当顶点调用完compute()方法后,就把该Partition的computedVertexCount加1  
                partitionStats.incrComputedVertexCount();  
            } finally {  
               computeOneTimerContext.stop();  
            }  
    ……  

4. 添加Counters统计,和我的博客Giraph源码分析(七)—— 添加消息统计功能 类似,此处不再详述。添加的类为:org.apache.giraph.counters.GiraphComputedVertex,下面附上该类的源码:
    package org.apache.giraph.counters;  
      
    import java.util.Iterator;  
    import java.util.Map;  
      
    import org.apache.hadoop.mapreduce.Mapper.Context;  
    import com.google.common.collect.Maps;  
      
    /** 
     * Hadoop Counters in group "Giraph Messages" for counting every superstep 
     * message count. 
     */  
      
    public class GiraphComputedVertex extends HadoopCountersBase {  
        /** Counter group name for the giraph Messages */  
        public static final String GROUP_NAME = "Giraph Computed Vertex";  
      
        /** Singleton instance for everyone to use */  
        private static GiraphComputedVertex INSTANCE;  
      
        /** superstep time in msec */  
        private final Map<Long, GiraphHadoopCounter> superstepVertexCount;  
      
        private GiraphComputedVertex(Context context) {  
            super(context, GROUP_NAME);  
            superstepVertexCount = Maps.newHashMap();  
        }  
      
        /** 
         * Instantiate with Hadoop Context. 
         *  
         * @param context 
         *            Hadoop Context to use. 
         */  
        public static void init(Context context) {  
            INSTANCE = new GiraphComputedVertex(context);  
        }  
      
        /** 
         * Get singleton instance. 
         *  
         * @return singleton GiraphTimers instance. 
         */  
        public static GiraphComputedVertex getInstance() {  
            return INSTANCE;  
        }  
      
        /** 
         * Get counter for superstep messages 
         *  
         * @param superstep 
         * @return 
         */  
        public GiraphHadoopCounter getSuperstepVertexCount(long superstep) {  
            GiraphHadoopCounter counter = superstepVertexCount.get(superstep);  
            if (counter == null) {  
                String counterPrefix = "Superstep: " + superstep+" ";  
                counter = getCounter(counterPrefix);  
                superstepVertexCount.put(superstep, counter);  
            }  
            return counter;  
        }  
      
        @Override  
        public Iterator<GiraphHadoopCounter> iterator() {  
            return superstepVertexCount.values().iterator();  
        }  
    }  


5. 实验结果,运行程序后。会在终端输出每次迭代参与计算的顶点总数目。 测试SSSP(SimpleShortestPathsVertex类),输入图中共有9个顶点和12条边。输出结果如下:

上图测试中,共有6次迭代。红色框中,显示出了每次迭代过冲参与计算的顶点数目,依次是:9,4,4,3,4,0

解释:在第0个超步,每个顶点都是活跃的,所有共有9个顶点参与计算。在第5个超步,共有0个顶点参与计算,那么就不会向外发送消息,加上每个顶点都是不活跃的,所以算法迭代终止。

完!

个人资料
hadoop迷
等级:6
文章:30篇
访问:2.2w
排名: 13
上一篇: Giraph源码分析(七)—— 添加消息统计功能
下一篇: Giraph源码分析(九)—— Aggregators 原理解析
猜你感兴趣的圈子:
图存储于图计算
标签: computedvertexcount、顶点、partitionstats、giraph、superstep、面试题
隐藏