Giraph源码分析(七)—— 添加消息统计功能

欢迎访问: 西北工业大学 - 大数据与知识管理研究室 (Northwestern Polytechnical University - BigData and Knowledge Management Lab),链接:http://wowbigdata.cn/http://wowbigdata.net.cn/http://wowbigdata.com.cn

目的:近日因实验要求,需分析Giraph每个超步发送的消息量。Giraph自带能统计每个超步的执行时间,而无消息统计功能。本文通过添加和修改Giraph源码来实现此功能。

1. 添加类,把每个超步发送的消息量大小写入Hadoop的Counter中。在org.apache.giraph.counters包下新建GiraphMessages类,来统计消息量。源代码如下:

    package org.apache.giraph.counters;  
      
    import java.util.Iterator;  
    import java.util.Map;  
      
    import org.apache.hadoop.mapreduce.Mapper.Context;  
    import com.google.common.collect.Maps;  
      
    /** 
     * Hadoop Counters in group "Giraph Messages" for counting every superstep 
     * message count. 
     */  
    public class GiraphMessages extends HadoopCountersBase {  
        /** Counter group name for the giraph Messages */  
        public static final String GROUP_NAME = "Giraph Messages";  
      
        /** Singleton instance for everyone to use */  
        private static GiraphMessages INSTANCE;  
      
        /** superstep time in msec */  
        private final Map<Long, GiraphHadoopCounter> superstepMessages;  
      
        private GiraphMessages(Context context) {  
            super(context, GROUP_NAME);  
            superstepMessages = Maps.newHashMap();  
        }  
      
        /** 
         * Instantiate with Hadoop Context. 
         *  
         * @param context 
         *            Hadoop Context to use. 
         */  
        public static void init(Context context) {  
            INSTANCE = new GiraphMessages(context);  
        }  
      
        /** 
         * Get singleton instance. 
         *  
         * @return singleton GiraphTimers instance. 
         */  
        public static GiraphMessages getInstance() {  
            return INSTANCE;  
        }  
      
        /** 
         * Get counter for superstep messages 
         *  
         * @param superstep 
         * @return 
         */  
        public GiraphHadoopCounter getSuperstepMessages(long superstep) {  
            GiraphHadoopCounter counter = superstepMessages.get(superstep);  
            if (counter == null) {  
                String counterPrefix = "Superstep- " + superstep+" ";  
                counter = getCounter(counterPrefix);  
                superstepMessages.put(superstep, counter);  
            }  
            return counter;  
        }  
      
        @Override  
        public Iterator<GiraphHadoopCounter> iterator() {  
            return superstepMessages.values().iterator();  
        }  
    }  


2. 在BspServiceMaster类中添加统计功能。Master在每次同步时候,会聚集每个Worker发送的消息量大小(求和),存储于GlobalStats中。因此只需要在每次同步后,从GlobalStats对象中取出总的通信量大小,然后写入GiraphMessages中。格式为<SuperStep-Number,TotalMessagesCount>,实际存储于上步GiraphMessages类中定义的Map<Long, GiraphHadoopCounter> superstepMessages 对象中。

    在BspServiceMaster的构造方法中,最后面追加一行代码,对GiraphMessages进行初始化。

GiraphMessages.init(context); 
   在BspServiceMaster类的SuperstepState coordinateSuperstep()方法中,添加记录功能。片段代码如下:

    ……  
    // If the master is halted or all the vertices voted to halt and there  
    // are no more messages in the system, stop the computation  
    GlobalStats globalStats = aggregateWorkerStats(getSuperstep());    
      
    LOG.info("D-globalStats: "+globalStats+"\n\n");  
    //添加下面语句。从第0个超步起开始记录。  
    if(getSuperstep() != INPUT_SUPERSTEP) {  
        GiraphMessages.getInstance().getSuperstepMessages(getSuperstep()).increment(globalStats.getMessageCount());  
    }  
    ……  

3. 实验结果如下:


   完!

个人资料
hadoop迷
等级:6
文章:30篇
访问:2.2w
排名: 13
上一篇: Giraph源码分析(六)——Edge 分析
下一篇: Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目
猜你感兴趣的圈子:
图存储于图计算
标签: giraphmessages、superstep、giraph、globalstats、superstepmessages、面试题
隐藏