欢迎访问: 西北工业大学 - 大数据与知识管理研究室 (Northwestern Polytechnical University - BigData and Knowledge Management Lab),链接:http://wowbigdata.cn/,http://wowbigdata.net.cn/,http://wowbigdata.com.cn。
环境:在单机上(机器名:giraphx)启动了2个workers。
输入:SSSP文件夹,里面有1.txt和2.txt两个文件
1. 在Worker向Master汇报健康状况后,就开始等待Master创建InputSplit。方法:每个Worker通过检某个Znode节点是否存在,同时在此Znode上设置Watcher。若不存在,就通过BSPEvent的waitForever()方法释放当前线程的锁,陷入等待状态。一直等到master创建该znode。此步骤位于BSPServiceWorker类中的startSuperStep方法中,等待代码如下:
//Znode的路径
String addressesAndPartitionsPath =
getAddressesAndPartitionsPath(getApplicationAttempt(),
getSuperstep());
//把该znode的data读入到addressesAndPartitions中
AddressesAndPartitionsWritable addressesAndPartitions =
new AddressesAndPartitionsWritable(
workerGraphPartitioner.createPartitionOwner().getClass());
//当master创建该znode后,退出while循环
while (getZkExt().exists(addressesAndPartitionsPath, true) ==
null) {
//陷入等待状态
getAddressesAndPartitionsReadyChangedEvent().waitForever();
//当master创建该znode后,触发Watcher。调用process进而唤醒线程
getAddressesAndPartitionsReadyChangedEvent().reset();
}
//读入数据
WritableUtils.readFieldsFromZnode(
getZkExt(),
addressesAndPartitionsPath,
false,
null,
addressesAndPartitions);
2. Master调用createInputSplits()方法创建InputSplit。
在generateInputSplits()方法中,根据用户设定的VertexInputFormat获得InputSplits。代码如下:
List<InputSplit> splits=inputFormat.getSplits(getContext(), minSplitCountHint);
minSplitCountHint = Workers数目 * NUM_INPUT_THREADS
NUM_INPUT_THREADS表示 每个Input split loading的线程数目,默认值为1 。
经查证,在TextVertexValueInputFormat抽象类中的getSplits()方法中的minSplitCountHint参数被忽略。用户输入的VertexInputFormat继承TextVertexValueInputFormat抽象类。
如果得到的splits.size小于minSplitCountHint,那么有些worker就没被用上。
得到split信息后,要把这些信息写到Zookeeper上,以便其他workers访问。上面得到的split信息如下:
[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]遍历splits List,为每个split创建一个Znode,值为split的信息。如为split-0创建Znode,值为:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0
为split-1创建znode(如下),值为:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1最后创建znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都创建好了。
3. Master根据splits创建Partitions。首先确定partition的数目。
BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>对象默认为HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:
@Override
public Collection<PartitionOwner> createInitialPartitionOwners(
Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
//maxWorkers为Workers的最大数目,用户通过 -w 指定。实验时指定为 2
//availableWorkerInfos为健康的Workers列表,此处为:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)]
int partitionCount = PartitionUtils.computePartitionCount(
availableWorkerInfos, maxWorkers, conf);
List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
//为每个Partition指定一个PartitionOwner,表示该Partition的元数据信息
for (int i = 0; i < partitionCount; ++i) {
PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
//若遍历完availableWorkerInfos,则开始下一轮遍历。
if (!workerIt.hasNext()) {
workerIt = availableWorkerInfos.iterator();
}
ownerList.add(owner);
}
this.partitionOwnerList = ownerList;
return ownerList;
}
上面代码中是在工具类PartitionUtils计算Partition的数目,计算公式如下:
partitionCount=PARTITION_COUNT_MULTIPLIER * availableWorkerInfos.size() * availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默认值为1 。
可见,partitionCount值为4(1*2*2)。创建的partitionOwnerList信息如下:
[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),
(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]
4. Master创建Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir ,用于后面的exchange partition。
5. Master最后在assignPartitionOwners()方法中,把masterinfo,chosenWorkerInfoList,partitionOwners等信息写入Znode中(作为Znode的data),该Znode的路径为: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。
Master调用barrierOnWorkerList()方法开始等待各个Worker完成数据加载。调用关系如下:
6. 当Master创建第5步的znode后,会激活worker。每个worker从znode上读出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,然后各个worker开始加载数据。
把partitionOwnerList复制给BSPServiceWorker类中的workerGraphPartitioner(默认为HashWorkerPartitioner类型)对象的partitionOwnerList变量,后续每个顶点把根据vertexID通过workerGraphPartitioner对象获取其对应的partitionOwner.
每个Worker从znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir获取子节点,得到inputSplitPathList,内容如下:
[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]
然后每个Worker创建N个InputsCallable线程读取数据。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS默认值为1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1
那么,默认每个worker就是创建一个线程来加载数据。
在InputSplitsHandler类中的reserveInputSplit()方法中,每个worker都是遍历inputSplitPathList,通过创建znode来保留(标识要处理)的split。代码及注释如下:
public String reserveInputSplit() {
String reservedInputSplitPath;
Stat reservedStat;
while (true) {
//currentIndex递增,要遍历完pathList
int splitToTry = currentIndex.getAndIncrement();
//遍历完pathList,说明所有的split都被处理了,退出while循环。
if (splitToTry >= pathList.size()) {
return null;
}
//得到split的znode path,如/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1
String nextSplitToClaim = pathList.get(splitToTry);
//构造znode路径,如: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1/_vertexInputSplitReserved
String tmpInputSplitReservedPath = nextSplitToClaim + inputSplitReservedNode;
//检测znode是否存在。若存在,说明该split已经被其他worker处理了。设置watcher是为了容错,可暂时忽略。
reservedStat =
zooKeeper.exists(tmpInputSplitReservedPath, this);
//若不存在,说明该znode对应的split还没有被处理掉。但有可能其他worker也在申请处理当前znode,
//所以下面创建znode时,可能会出现KeeperException.NodeExistsException异常。
if (reservedStat == null) {
try {
// Attempt to reserve this InputSplit
//若成功创建,那么当前worker就出该split
zooKeeper.createExt(tmpInputSplitReservedPath,
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,
false);
reservedInputSplitPath = nextSplitToClaim;
return reservedInputSplitPath;
} catch (KeeperException.NodeExistsException e) {
LOG.info("reserveInputSplit: Couldn't reserve " +
"(already reserved) inputSplit" +
" at " + tmpInputSplitReservedPath);
}
}
}
//inputSplitPath为znode的path,如: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1
private VertexEdgeCount loadInputSplit(
String inputSplitPath,
GraphState<I, V, E, M> graphState) {
//获取该znode对应的InputSplit信息,如得到:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46
InputSplit inputSplit = getInputSplit(inputSplitPath);
//从split中一行一行读入数据,把每行数据创建成一个vertex。
//然后根据vertexId把vertex发送到相应的partition上(数据重分布过程)
VertexEdgeCount vertexEdgeCount =
readInputSplit(inputSplit, graphState);
//处理完当前split后,创建结束znode标识该split已被处理掉。znode的path为: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1/_vertexInputSplitFinished
splitsHandler.markInputSplitPathFinished(inputSplitPath);
return vertexEdgeCount;
}
VertexInputSplitsCallable类的readInputSplit()方法如下:
protected VertexEdgeCount readInputSplit(
InputSplit inputSplit,
GraphState<I, V, E, M> graphState)
throws IOException, InterruptedException {
//获取用户输入的InputFormat类
VertexInputFormat<I, V, E> vertexInputFormat =
configuration.createVertexInputFormat();
VertexReader<I, V, E> vertexReader =
vertexInputFormat.createVertexReader(inputSplit, context);
vertexReader.setConf(
(ImmutableClassesGiraphConfiguration<I, V, E, Writable>) configuration);
vertexReader.initialize(inputSplit, context);
long inputSplitVerticesLoaded = 0;
long edgesSinceLastUpdate = 0;
long inputSplitEdgesLoaded = 0;
while (vertexReader.nextVertex()) {
//获取vertex
Vertex<I, V, E, M> readerVertex =
(Vertex<I, V, E, M>) vertexReader.getCurrentVertex();
if (readerVertex.getId() == null) {
throw new IllegalArgumentException(
"readInputSplit: Vertex reader returned a vertex " +
"without an id! - " + readerVertex);
}
if (readerVertex.getValue() == null) {
readerVertex.setValue(configuration.createVertexValue());
}
readerVertex.setConf(configuration);
readerVertex.setGraphState(graphState);
//根据vertexID获取其partitionOwner
PartitionOwner partitionOwner =
bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
//把顶点发送到对应的partition上
graphState.getWorkerClientRequestProcessor().sendVertexRequest(
partitionOwner, readerVertex);
context.progress(); // do this before potential data transfer
++inputSplitVerticesLoaded;
edgesSinceLastUpdate += readerVertex.getNumEdges();
}
vertexReader.close();
return new VertexEdgeCount(inputSplitVerticesLoaded,
inputSplitEdgesLoaded + edgesSinceLastUpdate);
}
策略如下,每个worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目录下创建子节点,后面追加自己的worker信息,如worker1、worker2创建的子节点分别如下:
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2,创建完后,然后等待master创建/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。
8.从第5步骤可知,若master发现/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子节点数目等于workers的总数目,就会在coordinateInputSplits()方法中创建
_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告诉每个worker,所有的worker都处理完了split。
9. 最后就是就行全局同步。
master创建znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,然后再调用barrierOnWorkerList方法检查该znode的子节点数目是否等于workers的数目,若不等于,则线程陷入等待状态。等待worker创建子节点来激活该线程继续判断。
每个worker获取自身的Partition Stats,进入finishSuperStep方法中,等待所有的Request都被处理完;把自身的Aggregator信息发送给master;创建子节点,如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data为该worker的partitionStatsList和workerSentMessages统计量;
最后调用waitForOtherWorkers()方法等待master创建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点。
master发现/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子节点数目等于workers数目后,根据/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子节点上的data收集每个worker发送的aggregator信息,汇总为globalStats。
Master若发现全局信息中(1)所有顶点都voteHalt且没有消息传递,或(2)达到最大迭代次数 时,设置 globalStats.setHaltComputation(true)。告诉works结束迭代。
master创建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点,data为globalStats。告诉所有workers当前超级步结束。
每个Worker检测到master创建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点后,读出该znode的数据,即全局的统计信息。然后决定是否继续下一次迭代。
10. 同步之后开始下一个超级步。
11.master和workers同步过程总结。
(1)master创建znode A,然后检测A的子节点数目是否等于workers数目,不等于就陷入等待。某个worker创建一个子节点后,就会唤醒master进行检测一次。
(2)每个worker进行自己的工作,完成后,创建A的子节点A1。然后等待master创建znode B。
(3)若master检测到A的子节点数目等于workers的数目时,创建Znode B
(4)master创建B 节点后,会激活各个worker。同步结束,各个worker就可以开始下一个超步。
本质是通过znode B来进行全局同步的。