Storm流之一个Spout分发多个Bolt
一、案例Demo
Spout中生成若干数字。
大于0,小于100 的数字分发到MinBolt
大于100,小于200的数字分发到MidBolt
大于200 的数字分发到MaxBolt
二、FenFaTopology代码
package com.test.csdn; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; /** * Created by Simon on 2018/4/19. */ public class FenFaTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("fenFaSpout",new FenFaSpout()); builder.setBolt("minBolt",new MinBolt()).localOrShuffleGrouping("fenFaSpout","min"); builder.setBolt("midBolt",new MidBolt()).localOrShuffleGrouping("fenFaSpout","mid"); builder.setBolt("maxBolt",new MaxBolt()).localOrShuffleGrouping("fenFaSpout","max"); Config config = new Config(); try { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(FenFaTopology.class.getSimpleName(), config, builder.createTopology()); }catch (Exception e){ e.printStackTrace(); } } }
三、FenFaSpout代码
package com.test.csdn; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; import java.util.Random; /** * Created by Simon on 2018/4/19. * 生成数据 * 将大于0 小于100 的分发给MinBolt * 将大于100 小于200 的分发给MidBolt * 将大于200 的分发给MaxBolt */ public class FenFaSpout extends BaseRichSpout{ int [] arr = {123,22,45,56,235,2,118,4,5,7,45,153,187,360,288,580,12,521,121,111}; private SpoutOutputCollector collector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; } @Override public void nextTuple() { try { Thread.sleep(1000); int i = new Random().nextInt(20); if(arr[i] >= 0 && arr[i] <= 100){ collector.emit("min",new Values(arr[i])); }else if(arr[i] > 100 && arr[i] <= 200){ collector.emit("mid",new Values(arr[i])); }else{ collector.emit("max",new Values(arr[i])); } } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declareStream("min",new Fields("value")); outputFieldsDeclarer.declareStream("mid",new Fields("value")); outputFieldsDeclarer.declareStream("max",new Fields("value")); } }
四、MinBolt代码
package com.test.csdn; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; /** * Created by Simon on 2018/4/19. */ public class MinBolt extends BaseBasicBolt{ @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { Integer i = tuple.getInteger(0); System.out.println("MinBolt得到的值:"+i); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
五、MidBolt代码
package com.test.csdn; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; /** * Created by Simon on 2018/4/19. */ public class MidBolt extends BaseBasicBolt{ @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { Integer i = tuple.getInteger(0); System.out.println("MidBolt得到的值:"+i); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
六、MaxBolt代码
package com.test.csdn; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; /** * Created by Simon on 2018/4/19. */ public class MaxBolt extends BaseBasicBolt{ @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { Integer i = tuple.getInteger(0); System.out.println("MaxBolt得到的值:"+i); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
七、运行结果
MaxBolt得到的值:235 MidBolt得到的值:118 MinBolt得到的值:22 MinBolt得到的值:56 MinBolt得到的值:56 MinBolt得到的值:45 MidBolt得到的值:118 MinBolt得到的值:4 MinBolt得到的值:22 MinBolt得到的值:56 MaxBolt得到的值:580 MidBolt得到的值:118 MidBolt得到的值:118 MaxBolt得到的值:580 MinBolt得到的值:5 MidBolt得到的值:153 MidBolt得到的值:187 MinBolt得到的值:45 MidBolt得到的值:153