The Process of Vertex Computation

A thread is created for a partition, process a partition returns a PartitionStats object.
ComputeCallable is a callable to process a partitoin.

ComputeCallable.call

There are three parts of ComputeCallable.call.

public Collection<PartitionStats> call() {
   first step:   Init Variables
   second step : process partitions
   third step: clean up, update counters and return value;
}

first step: Init Variables

In the ComputeCallable.call, setup some variables first.

WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
        new NettyWorkerClientRequestProcessor<I, V, E>(
            context, configuration, serviceWorker,
            configuration.getOutgoingMessageEncodeAndStoreType().
              useOneMessageToManyIdsEncoding());
    WorkerThreadGlobalCommUsage aggregatorUsage =
        serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();

    vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();

    Computation<I, V, E, M1, M2> computation =
        (Computation<I, V, E, M1, M2>) configuration.createComputation();
    computation.initialize(graphState, workerClientRequestProcessor,
        serviceWorker, aggregatorUsage);
    computation.preSuperstep();

    List<PartitionStats> partitionStatsList = Lists.newArrayList();
    PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
    OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
    GraphTaskManager<I, V, E> taskManager = serviceWorker.getGraphTaskManager();

second step : process partitions

process partitions step is a infinity loop, to iterate the partitions by partitionStore.getNextPartition();. The break conditions is partition == null.

while (true) {
      Partition<I, V, E> partition = partitionStore.getNextPartition();
      if (partition == null) {
        break;
      }
     
      try {
        serviceWorker.getServerData().resolvePartitionMutation(partition);
        PartitionStats partitionStats = computePartition(
            computation, partition, oocEngine,
            serviceWorker.getConfiguration().getIncomingMessageClasses()
              .ignoreExistingVertices());
        partitionStatsList.add(partitionStats);
      } catch (IOException e) {
      
      } finally {
        partitionStore.putPartition(partition);
      }
    }

DiskBackedPartitionStore.getNextPartition

The type of partitionStore is SimplePartitionStore.

public Partition<I, V, E> getNextPartition() {
    Integer partitionId = oocEngine.getNextPartition();
    if (partitionId == null) {
      return null;
    }
    Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
    if (partition == null) {
      if (LOG.isInfoEnabled()) {
        LOG.info("getNextPartition: partition " + partitionId + " is not in " +
            "the partition store. Creating an empty partition for it.");
      }
      partition = conf.createPartition(partitionId, context);
    }
    partitionStore.addPartition(partition);
    return partition;
  }

OutOfCoreEngine.getNextPartition

public Integer getNextPartition() {
    Integer partitionId;
    synchronized (partitionAvailable) {
      while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
        try {
          partitionAvailable.wait(MSEC_TO_WAIT);
        } catch (InterruptedException e) {
          throw new IllegalStateException("getNextPartition: caught " +
              "InterruptedException while waiting to retrieve a partition to " +
              "process");
        }
      }
      if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
        partitionAvailable.notifyAll();
        partitionId = null;
      }
    }
    return partitionId;
  }

MetaPartitionManager.getNextPartition

public Integer getNextPartition() {
    if (numPartitionsProcessed.get() >= partitions.size()) {
      return NO_PARTITION_TO_PROCESS;
    }
    int numThreads = perThreadPartitionDictionary.size();
    int index = randomGenerator.nextInt(numThreads);
    int startIndex = index;
    MetaPartition meta;
    do {
      // We first look up a partition in the reverse dictionary. If there is a
      // partition with the given properties, we then check whether we can
      // return it as the next partition to process. If we cannot, there may
      // still be other partitions in the dictionary, so we will continue
      // looping through all of them. If all the partitions with our desired
      // properties has been examined, we will break the loop.
      while (true) {
        meta = perThreadPartitionDictionary.get(index).lookup(
            ProcessingState.UNPROCESSED,
            StorageState.IN_MEM,
            StorageState.IN_MEM,
            null);
        if (meta != null) {
          // Here we should check if the 'meta' still has the same property as
          // when it was looked up in the dictionary. There may be a case where
          // meta changes from the time it is looked up until the moment the
          // synchronize block is granted to progress.
          synchronized (meta) {
            if (meta.getProcessingState() == ProcessingState.UNPROCESSED &&
                meta.getPartitionState() == StorageState.IN_MEM &&
                meta.getCurrentMessagesState() == StorageState.IN_MEM) {
              perThreadPartitionDictionary.get(index).removePartition(meta);
              meta.setProcessingState(ProcessingState.IN_PROCESS);
              perThreadPartitionDictionary.get(index).addPartition(meta);
              return meta.getPartitionId();
            }
          }
        } else {
          break;
        }
      }
      index = (index + 1) % numThreads;
    } while (index != startIndex);
    return null;
  }

The first loop is resemble the following code. It prevent find from zero.

do {
   index = (index + 1) % numThreads;
} while (index != startIndex);

The inner while loop is resemble the following code.

while (true) {
        meta = perThreadPartitionDictionary.get(index).lookup(
            ProcessingState.UNPROCESSED,
            StorageState.IN_MEM,
            StorageState.IN_MEM,
            null);
        if (meta != null) {
            if meta is ok {
                return;
            } else {
            continue the inner loop;
            }
         }else { // meta is null; return the outer loop.
            break; 
         }   
 }

ComputeCallable.computePartition

private PartitionStats computePartition(
      Computation<I, V, E, M1, M2> computation,
      Partition<I, V, E> partition, OutOfCoreEngine oocEngine,
      boolean ignoreExistingVertices)
      throws IOException, InterruptedException {
       synchronized (partition) {
      if (ignoreExistingVertices) {
        Iterable<I> destinations =
            messageStore.getPartitionDestinationVertices(partition.getId());
            // deal with messeages.
     } else {
        int count = 0;
        for (Vertex<I, V, E> vertex : partition) {
        Iterable<M1> messages =
              messageStore.getVertexMessages(vertex.getId());
              computation.compute(vertex, messages);
              messageStore.clearVertexMessages(vertex.getId());
        }

The for loop Vertex<I, V, E> vertex : partition calls SimplePartition’s Iterator methods.

SimplePartition.Iterator

 @Override
  public Iterator<Vertex<I, V, E>> iterator() {
    return vertexMap.values().iterator();
  }

猜你喜欢

转载自blog.csdn.net/houzhizhen/article/details/106789873