A thread is created for a partition, process a partition returns a PartitionStats object.
ComputeCallable is a callable to process a partitoin.
ComputeCallable.call
There are three parts of ComputeCallable.call.
public Collection<PartitionStats> call() {
first step: Init Variables
second step : process partitions
third step: clean up, update counters and return value;
}
first step: Init Variables
In the ComputeCallable.call, setup some variables first.
WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
new NettyWorkerClientRequestProcessor<I, V, E>(
context, configuration, serviceWorker,
configuration.getOutgoingMessageEncodeAndStoreType().
useOneMessageToManyIdsEncoding());
WorkerThreadGlobalCommUsage aggregatorUsage =
serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
Computation<I, V, E, M1, M2> computation =
(Computation<I, V, E, M1, M2>) configuration.createComputation();
computation.initialize(graphState, workerClientRequestProcessor,
serviceWorker, aggregatorUsage);
computation.preSuperstep();
List<PartitionStats> partitionStatsList = Lists.newArrayList();
PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
GraphTaskManager<I, V, E> taskManager = serviceWorker.getGraphTaskManager();
second step : process partitions
process partitions step is a infinity loop, to iterate the partitions by partitionStore.getNextPartition();
. The break conditions is partition == null
.
while (true) {
Partition<I, V, E> partition = partitionStore.getNextPartition();
if (partition == null) {
break;
}
try {
serviceWorker.getServerData().resolvePartitionMutation(partition);
PartitionStats partitionStats = computePartition(
computation, partition, oocEngine,
serviceWorker.getConfiguration().getIncomingMessageClasses()
.ignoreExistingVertices());
partitionStatsList.add(partitionStats);
} catch (IOException e) {
} finally {
partitionStore.putPartition(partition);
}
}
DiskBackedPartitionStore.getNextPartition
The type of partitionStore is SimplePartitionStore.
public Partition<I, V, E> getNextPartition() {
Integer partitionId = oocEngine.getNextPartition();
if (partitionId == null) {
return null;
}
Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
if (partition == null) {
if (LOG.isInfoEnabled()) {
LOG.info("getNextPartition: partition " + partitionId + " is not in " +
"the partition store. Creating an empty partition for it.");
}
partition = conf.createPartition(partitionId, context);
}
partitionStore.addPartition(partition);
return partition;
}
OutOfCoreEngine.getNextPartition
public Integer getNextPartition() {
Integer partitionId;
synchronized (partitionAvailable) {
while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
try {
partitionAvailable.wait(MSEC_TO_WAIT);
} catch (InterruptedException e) {
throw new IllegalStateException("getNextPartition: caught " +
"InterruptedException while waiting to retrieve a partition to " +
"process");
}
}
if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
partitionAvailable.notifyAll();
partitionId = null;
}
}
return partitionId;
}
MetaPartitionManager.getNextPartition
public Integer getNextPartition() {
if (numPartitionsProcessed.get() >= partitions.size()) {
return NO_PARTITION_TO_PROCESS;
}
int numThreads = perThreadPartitionDictionary.size();
int index = randomGenerator.nextInt(numThreads);
int startIndex = index;
MetaPartition meta;
do {
// We first look up a partition in the reverse dictionary. If there is a
// partition with the given properties, we then check whether we can
// return it as the next partition to process. If we cannot, there may
// still be other partitions in the dictionary, so we will continue
// looping through all of them. If all the partitions with our desired
// properties has been examined, we will break the loop.
while (true) {
meta = perThreadPartitionDictionary.get(index).lookup(
ProcessingState.UNPROCESSED,
StorageState.IN_MEM,
StorageState.IN_MEM,
null);
if (meta != null) {
// Here we should check if the 'meta' still has the same property as
// when it was looked up in the dictionary. There may be a case where
// meta changes from the time it is looked up until the moment the
// synchronize block is granted to progress.
synchronized (meta) {
if (meta.getProcessingState() == ProcessingState.UNPROCESSED &&
meta.getPartitionState() == StorageState.IN_MEM &&
meta.getCurrentMessagesState() == StorageState.IN_MEM) {
perThreadPartitionDictionary.get(index).removePartition(meta);
meta.setProcessingState(ProcessingState.IN_PROCESS);
perThreadPartitionDictionary.get(index).addPartition(meta);
return meta.getPartitionId();
}
}
} else {
break;
}
}
index = (index + 1) % numThreads;
} while (index != startIndex);
return null;
}
The first loop is resemble the following code. It prevent find from zero.
do {
index = (index + 1) % numThreads;
} while (index != startIndex);
The inner while loop is resemble the following code.
while (true) {
meta = perThreadPartitionDictionary.get(index).lookup(
ProcessingState.UNPROCESSED,
StorageState.IN_MEM,
StorageState.IN_MEM,
null);
if (meta != null) {
if meta is ok {
return;
} else {
continue the inner loop;
}
}else { // meta is null; return the outer loop.
break;
}
}
ComputeCallable.computePartition
private PartitionStats computePartition(
Computation<I, V, E, M1, M2> computation,
Partition<I, V, E> partition, OutOfCoreEngine oocEngine,
boolean ignoreExistingVertices)
throws IOException, InterruptedException {
synchronized (partition) {
if (ignoreExistingVertices) {
Iterable<I> destinations =
messageStore.getPartitionDestinationVertices(partition.getId());
// deal with messeages.
} else {
int count = 0;
for (Vertex<I, V, E> vertex : partition) {
Iterable<M1> messages =
messageStore.getVertexMessages(vertex.getId());
computation.compute(vertex, messages);
messageStore.clearVertexMessages(vertex.getId());
}
The for loop Vertex<I, V, E> vertex : partition
calls SimplePartition’s Iterator methods.
SimplePartition.Iterator
@Override
public Iterator<Vertex<I, V, E>> iterator() {
return vertexMap.values().iterator();
}