背景:使用kafka推送sku id,统计每个10min内sku出现次数的topN。
定时实现:
1、为bolt设置tick:bolt继承BaseBasicBolt/BaseRichBolt,重写getComponentConfiguration()方法。在方法中设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,单位是秒。
//设置10min发送一次tick心跳 @SuppressWarnings("static-access") @Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, period); return conf; }
2、设置后,此bolt的所有task每隔设定时间收到一个来自__system(系统的bolt id)的__tick (自己bolt的streamID为default,只有系统的为__tick)stream的tick tuple
@Override public void execute(Tuple tuple) { if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){ writeToHbase(); counts.clear(); }else{ String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L;//如果不存在,初始化为0 } count++;//增加计数 this.counts.put(word, count);//存储计数 } }
注意:这是一个并列关系,即
@Override public void execute(Tuple tuple) { if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){ //收到的tuple为tick tuple ,do something... }else{ //收到的tuple为正常 tuple ,do something...}
3、tick设置的优先级
storm中的tick也有优先级,即全局tick的作用域是全局bolt,每个bolt也可定义自己的tick。
当我们在整个Topology上设置tick和我们单个运算bolt上冲突时,更小范围的bolt设置的tick优先级更高!!!