mahout推荐引擎使用hadoop(三) 协同矩阵与用户向量相乘

第三步,就是准备协同矩阵与用户向量相乘的过程了

//协同矩阵与用户向量相乘
    //start the multiplication of the co-occurrence matrix by the user vectors
    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
      //第一个MapReducer
      Job prePartialMultiply1 = prepareJob(
              similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
              SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
              Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
              SequenceFileOutputFormat.class);
      boolean succeeded = prePartialMultiply1.waitForCompletion(true);
      if (!succeeded) 
        return -1;
      //第二个MapReduce
      //continue the multiplication
      Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
              prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
              VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
              SequenceFileOutputFormat.class);
      if (usersFile != null) {
        prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
      }
      prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
              maxPrefsPerUser);
      succeeded = prePartialMultiply2.waitForCompletion(true);
      if (!succeeded) 
        return -1;
      //finish the job
      //第三个MapReduce
      Job partialMultiply = prepareJob(
              new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
              SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
              ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
              SequenceFileOutputFormat.class);
      setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);
      succeeded = partialMultiply.waitForCompletion(true);
      if (!succeeded) 
        return -1;
    }

 下边也是同样分析一下这个三个MapReduce的细节:

1、Mapper: SimilarityMatrixRowWrapperMapper 类,将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper 的输出类型一致

public final class SimilarityMatrixRowWrapperMapper extends
    Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
  
  //将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper
  //的输出类型一致
  @Override
  protected void map(IntWritable key,
                     VectorWritable value,
                     Context context) throws IOException, InterruptedException {
    Vector similarityMatrixRow = value.get();
    /* remove self similarity */
    similarityMatrixRow.set(key.get(), Double.NaN);
    context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));
  }

}

2、Mapper:UserVectorSplitterMapper

//输入格式: theUserID:<itemid_index1,pref1>,<itemid_index2,pref2>........<itemid_indexN,prefN>
  //输出格式:  itemid1:<theUserID,pref1>
  //		  itemid2:<theUserID,pref2>
  //		  itemid3:<theUserID,pref3>
  //		  ......
  //	      itemidN:<theUserID,prefN>
public final class UserVectorSplitterMapper extends
    Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
  
  @Override
  protected void map(VarLongWritable key,
                     VectorWritable value,
                     Context context) throws IOException, InterruptedException {
    long userID = key.get();
    if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
      return;
    }
    Vector userVector = maybePruneUserVector(value.get());
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    VarIntWritable itemIndexWritable = new VarIntWritable();
    VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
    while (it.hasNext()) {
      Vector.Element e = it.next();
      itemIndexWritable.set(e.index());
      vectorOrPref.set(userID, (float) e.get());
      context.write(itemIndexWritable, vectorOrPref);
    }
  }

3、Reduce:ToVectorAndPrefReducer类,收集协同矩阵为itemid的一行,并且收集评价过该item的用户和评分,最后的输出是 itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)

public final class ToVectorAndPrefReducer extends
    Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {

  //收集所有key为itemid的
  @Override
  protected void reduce(VarIntWritable key,
                        Iterable<VectorOrPrefWritable> values,
                        Context context) throws IOException, InterruptedException {

    List<Long> userIDs = Lists.newArrayList();
    List<Float> prefValues = Lists.newArrayList();
    Vector similarityMatrixColumn = null;
    for (VectorOrPrefWritable value : values) {
      if (value.getVector() == null) {
        // Then this is a user-pref value
        userIDs.add(value.getUserID());
        prefValues.add(value.getValue());
      } else {
        // Then this is the column vector
    	//协同矩阵的一个行(行号为itemid的一行)
        if (similarityMatrixColumn != null) {
          throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
        }
        similarityMatrixColumn = value.getVector();
      }
    }

    if (similarityMatrixColumn == null) {
      return;
    }
    //收集协同矩阵为itemid的一行,并且手机评价过该item的用户和评分
    VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);
    context.write(key, vectorAndPrefs);
  }

}

第四步,协同矩阵和用户向量相乘,得到推荐结果

 //extract out the recommendations
      Job aggregateAndRecommend = prepareJob(
              new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
              PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
              AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
              TextOutputFormat.class);
      Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
 

Mapper:PartialMultiplyMapper

  //输入类型:( itemid_index, <userid的数组,pref的数组,协同矩阵行号为itemid_index的行> )
  //输出类型: userid,<该用户对itemid_index1的评分,协同矩阵行号为itemid_index1的行> )
  //	      userid,<该用户对itemid_index2的评分,协同矩阵行号为itemid_index2的行> )
  //                       .....	
  //                       .....
  //          userid,<该用户对itemid_indexN的评分,协同矩阵行号为itemid_indexN的行> )
public final class PartialMultiplyMapper extends
    Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> {

  @Override
  protected void map(VarIntWritable key,
                     VectorAndPrefsWritable vectorAndPrefsWritable,
                     Context context) throws IOException, InterruptedException {

    Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();
    List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
    List<Float> prefValues = vectorAndPrefsWritable.getValues();

    VarLongWritable userIDWritable = new VarLongWritable();
    PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();

    for (int i = 0; i < userIDs.size(); i++) {
      long userID = userIDs.get(i);
      float prefValue = prefValues.get(i);
      if (!Float.isNaN(prefValue)) {
        prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);
        userIDWritable.set(userID);
        context.write(userIDWritable, prefAndSimilarityColumn);
      }
    }
  }

}
 

 Reducer:AggregateAndRecommendReducer类,Reducer中进行PartialMultiply,按乘积得到的推荐度的大小取出最大的几个item。对于非booleanData,是用pref和相似度矩阵的PartialMultiply得到推荐度的值来进行排序。
而booleanData的pref值都是1.0f,所以去计算矩阵相乘的过程没有意义,直接累加相似度的值即可。
用这个数据排序就可得到推荐结果

public final class AggregateAndRecommendReducer extends
    Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> {
 @Override
  protected void reduce(VarLongWritable userID,
                        Iterable<PrefAndSimilarityColumnWritable> values,
                        Context context) throws IOException, InterruptedException {
    if (booleanData) {
      reduceBooleanData(userID, values, context);
    } else {
      reduceNonBooleanData(userID, values, context);
    }
  }

  private void reduceBooleanData(VarLongWritable userID,
                                 Iterable<PrefAndSimilarityColumnWritable> values,
                                 Context context) throws IOException, InterruptedException {
    /* having boolean data, each estimated preference can only be 1,
     * however we can't use this to rank the recommended items,
     * so we use the sum of similarities for that. */
    Vector predictionVector = null;
    for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
      predictionVector = predictionVector == null
          ? prefAndSimilarityColumn.getSimilarityColumn()
          : predictionVector.plus(prefAndSimilarityColumn.getSimilarityColumn());
    }
    writeRecommendedItems(userID, predictionVector, context);
  }

  private void reduceNonBooleanData(VarLongWritable userID,
                        Iterable<PrefAndSimilarityColumnWritable> values,
                        Context context) throws IOException, InterruptedException {
    /* each entry here is the sum in the numerator of the prediction formula */
    Vector numerators = null;
    /* each entry here is the sum in the denominator of the prediction formula */
    Vector denominators = null;
    /* each entry here is the number of similar items used in the prediction formula */
    Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);

    for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
      Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
      float prefValue = prefAndSimilarityColumn.getPrefValue();
      /* count the number of items used for each prediction */
      Iterator<Vector.Element> usedItemsIterator = simColumn.iterateNonZero();
      while (usedItemsIterator.hasNext()) {
        int itemIDIndex = usedItemsIterator.next().index();
        numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
      }
      //vector.times(float) 是向量乘于一个数,也就是向量的每一个值都乘以这个数
      //vector.plus(vector) 是两个向量相加,每一个位置上的值相加
      
      //numerators是一个vecotr,每一个元素是这样的
      /*
                例如index为item1的元素的值为:
       simility(item1, item_2)*pref(userid, item_2)
      + simility(item_1, item_3)*pref(userid, item_3)
      + simility(item1, item_4)*pref(userid, item_4)
      + …… 
      + simility(item_1, item_2)*pref(userid, item_N)
      */
      // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度 ,pref(userid, item)代表用于userid对item打分分值 
     
      numerators = numerators == null
          ? prefValue == BOOLEAN_PREF_VALUE ? simColumn.clone() : simColumn.times(prefValue)
          : numerators.plus(prefValue == BOOLEAN_PREF_VALUE ? simColumn : simColumn.times(prefValue));
      
      
      
      simColumn.assign(ABSOLUTE_VALUES);
      //denominators是一个vecotr,每一个元素是这样的
      /*
                例如index为item1的元素的值为:
       simility(item1, item_2)+ simility(item_1, item_3)+ …… + simility(item_1, item_2)*pref(userid, item_N)
      */
      // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度
      denominators = denominators == null ? simColumn : denominators.plus(simColumn);
    }

    if (numerators == null) {
      return;
    }

    Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    Iterator<Vector.Element> iterator = numerators.iterateNonZero();
    while (iterator.hasNext()) {
      Vector.Element element = iterator.next();
      int itemIDIndex = element.index();
      /* preference estimations must be based on at least 2 datapoints */
      if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {
        /* compute normalized prediction */
    	//计算归一化预测值
        double prediction = element.get() / denominators.getQuick(itemIDIndex);
        recommendationVector.setQuick(itemIDIndex, prediction);
      }
    }
    writeRecommendedItems(userID, recommendationVector, context);
  }
}

猜你喜欢

转载自eric-gcm.iteye.com/blog/1820060