源头 Spout
public class WordCountSpout implements IRichSpout {
private TopologyContext context ;
private SpoutOutputCollector collector ;
private List<String> states ;
private Random r = new Random();
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context ;
this.collector = collector ;
states = new ArrayList<String>();
states.add("hello world tom");
states.add("hello world tomas");
states.add("hello world tomasLee");
states.add("hello world tomson");
}
public void close() {
}
public void activate() {
}
public void deactivate() {
}
public void nextTuple() {
String line = states.get(r.nextInt(4));
collector.emit(new Values(line));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
分片 Bolt
public class SplitBolt implements IRichBolt {
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
Util.sendToClient(this, "prepare()", 8888);
this.context = context;
this.collector = collector;
}
public void execute(Tuple tuple) {
Util.sendToClient(this, "execute()", 8888);
String line = tuple.getString(0);
String[] arr = line.split(" ");
for (String s : arr) {
collector.emit(new Values(s, 1));
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
计数 Bolt
public class CountBolt implements IRichBolt {
private Map<String, Integer> map;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
Util.sendToClient(this, "prepare()", 9999);
this.context = context;
this.collector = collector;
map = new HashMap<String, Integer>();
}
public void execute(Tuple tuple) {
Util.sendToClient(this, "execute()", 9999);
String word = tuple.getString(0);
Integer count = tuple.getInteger(1);
if (!map.containsKey(word)) {
map.put(word, 1);
} else {
map.put(word, map.get(word) + count);
}
}
public void cleanup() {
for (Map.Entry<String, Integer> entry : map.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
WCApp
public class WCApp {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("wcspout", new WordCountSpout(), 3).setNumTasks(3);
builder.setBolt("split-bolt", new SplitBolt(), 4).shuffleGrouping("wcspout").setNumTasks(4);
builder.setBolt("counter-bolt", new CountBolt(), 5).fieldsGrouping("split-bolt", new Fields("word")).setNumTasks(5);
Config conf = new Config();
conf.setNumWorkers(3);
conf.setDebug(true);
StormSubmitter.submitTopology("wordcount", conf, builder.createTopology());
}
}