nutch generator 详解



job1
map Selector
    输入目录为crawldb/current
    输入key:Text 为url ,Value:CrawlDatum
    功能如下
       1 如果filter为true URLFilter过滤,如果过滤的后的url为空返回
      2 调用(!schedule.shouldFetch(url, crawlDatum, curTime)方法计算是不是要fetch 逻辑是看value的当前fetchtime和当前时间的比较,大于返回true,否则为false,这里有个逻辑是如果当前value的fetchtime减去当前时间大约最大间隔时间,并且当前的value的间隔时间大于最大间隔时间重新设置。代码如下
    // pages are never truly GONE - we have to check them from time to time.
    // pages with too long fetchInterval are adjusted so that they fit within
    // maximum fetchInterval (segment retention period).
    if (datum.getFetchTime() - curTime > (long) maxInterval * 1000) {
      if (datum.getFetchInterval() > maxInterval) {
        datum.setFetchInterval(maxInterval * 0.9f);
      }
      datum.setFetchTime(curTime);
    }
    if (datum.getFetchTime() > curTime) {
      return false;                                   // not time yet
    }
    return true;
  }

  返回false 则返回
    3 如果已经有fetch value的meta 会被写入Nutch.WRITABLE_GENERATE_TIME_KEY 的key 值为上次这个map的时间,这个上次写入时间与当前时间的差值和间隔时间配置crawl.gen.delay的值比较,如果大于这个时间,才能fetch
   4   sort = scfilters.generatorSortValue((Text) key, crawlDatum, sort);这个 scfilters最后调用OPICScoringFilter计算分数,是逻辑是当前value:crawlDatum 的datum.getScore() * sort,如果计算的sort 小于generate.min.score 这个配置的值返回
   5  写入 key:FloatWritable value:SelectorEntry  ,设置value 元数据的generator的时间 key为Nutch.WRITABLE_GENERATE_TIME_KEY,值为当前时间

      key 是步骤4中的sort   ,代码如下 
    sortValue.set(sort);
      // record generation time
      crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
      entry.datum = crawlDatum;
      entry.url = (Text) key;
      output.collect(sortValue, entry); // invert for sort by score
   6 这里有个设置    job.setPartitionerClass(Selector.class);使用URLPartitioner split,这个根据配置partition.url.mode 根据ip或者domain 取hash
    
reduce Selector
  1 topN  ,这个值设置的一个segment的数据大小,如果一个segment大于这个值,如果有下一个segment,则取下一个segment,否则返回
  2 如设置了normalise为true normalizers不为空,调用normalize
  3 根据这个key:generate.count.mode,确定分组模式 是域名还是ip。
  4 根据这个generate.max.count  配置看 每个ip或域名分组在单个segment里面最大的值,如果为-1 则不检查,如果不为-1,超过这个值,写入下个segment,如果没有segment了。舍弃这个url。否则设置value的segment,entry.segnum = new IntWritable(hostCount[0]),如果为-1 直接设置segment entry.segnum = new IntWritable(currentsegmentnum);
   4 写入key :FloatWritable  ,value:SelectorEntry  ruduce的输出目录  Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-"
        + System.currentTimeMillis());
   5 reduce 设置了key的 Comparator DecreasingFloatComparator,不用转换对象进行字节比较
  类如下
  public static class DecreasingFloatComparator extends FloatWritable.Comparator {

    /** Compares two FloatWritables decreasing. */
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      return super.compare(b2, s2, l2, b1, s1, l1);
    }
  }

6 设置  job.setOutputFormat(GeneratorOutputFormat.class); 代码如下,取得文件名字
  // Allows the reducers to generate one subfile per
  public static class GeneratorOutputFormat extends
      MultipleSequenceFileOutputFormat<FloatWritable,SelectorEntry> {
    // generate a filename based on the segnum stored for this entry
    protected String generateFileNameForKeyValue(FloatWritable key, SelectorEntry value,
        String name) {
      return "fetchlist-" + value.segnum.toString() + "/" + name;
    }

  }

partitionSegment job
map :SelectorInverseMapper
     输入目录为上个job的输出目录下面的以fetchlist-segment 的目录
      1 map 主要是 将 k-v的转换,key:Text 为url value:SelectorEntry
代码如下
public void map(FloatWritable key, SelectorEntry value,
        OutputCollector<Text,SelectorEntry> output, Reporter reporter) throws IOException {
      SelectorEntry entry = (SelectorEntry) value;
      output.collect(entry.url, entry);
    }
     2   job.setPartitionerClass(URLPartitioner.class);
reduce :  PartitionReducer

输出目录 crawl/segments/当前时间/crawl_generate  例如:crawl/segments/20120716172043/crawl_generate
   1 ruduce只是做的k-v的转换 key:Text 为url,value:CrawlDatum
代码如下
public void reduce(Text key, Iterator<SelectorEntry> values,
        OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws IOException {
      // if using HashComparator, we get only one input key in case of
      // hash collision
      // so use only URLs from values
      while (values.hasNext()) {
        SelectorEntry entry = values.next();
        output.collect(entry.url, entry.datum);
      }
    }
     2 设置    job.setOutputKeyComparatorClass(HashComparator.class);

  
删除第一个job的输出目录
如果设置了generate.update.crawldb 为true则运行第三个job

map :CrawlDbUpdater
输入目录为第二个job的输出目录,即生成的segment目录里面的crawl_generate目录例如 :crawl\segments\20120711150527\crawl_generate和/crawldb/current 目录,
    1 map只是合并目录

reduce :CrawlDbUpdater 去重复,保证一个url只有一条记录
   输出目录
  // update the db from tempDir
      Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-"
          + System.currentTimeMillis());

   最后调用 CrawlDb.install(job, dbDir); 更新当前crawldb
删除 这个job的输出文件

猜你喜欢

转载自chengqianl.iteye.com/blog/1595320