Storm流之FieldGrouping字段分组
一、 需求
字符串按照是否包含o分组
二、 Topology
package com.test.csdn; import com.test.storm.bolt.FieldPrintBolt; import com.test.storm.spout.SplitTestSpout; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; /** * Created by Simon on 2018/4/26. */ public class FiledGroupingTopo { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new FiledGroupingSpout(), 1); //指定5个,便于测试 builder.setBolt("bolt", new FiledGroupingBolt(), 5).fieldsGrouping("spout", new Fields("isContainsO")); Config conf = new Config(); conf.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("toplogy", conf, builder.createTopology()); Utils.sleep(Long.MAX_VALUE); cluster.shutdown(); } }
三、 Spout
package com.test.csdn; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; import java.util.Random; /** * Created by Simon on 2018/4/26. */ public class FiledGroupingSpout extends BaseRichSpout{ private SpoutOutputCollector collector; String[] str = {"xiaomi","huawei","apple","oppo","vivo","lenovo","LG", "samsung","htc","honor","nokia","smartisan","Sony","BlackBerry","sharp"}; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector=spoutOutputCollector; } @Override public void nextTuple() { try { Thread.sleep(1000); int i = new Random().nextInt(15); String string = str[i]; //按照是否有o分组 boolean o = string.contains("o"); collector.emit(new Values(o,string)); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("isContainsO","string")); } }
四、 Bolt
package com.test.csdn; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; /** * Created by Simon on 2018/4/26. */ public class FiledGroupingBolt extends BaseBasicBolt{ @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { System.out.println(Thread.currentThread().getName()+"___"+tuple.getValue(1)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
五、 输出结果