以 WordCount 为例
代码示例如下:
//源头Spout
public class WordCountSpout implements IRichSpout {
private TopologyContext context ;
private SpoutOutputCollector collector ;
private List<String> states ;
private Random r = new Random();
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// Util.sendToClient(this,"open()",7777);
this.context = context ;
this.collector = collector ;
states = new ArrayList<String>();
states.add("hello world tom");
states.add("hello world tomas");
states.add("hello world tomasLee");
states.add("hello world tomson");
}
public void close() {
}
public void activate() {
}
public void deactivate() {
}
public void nextTuple() {
// Util.sendToClient(this, "nextTuple()",7777);
String line = states.get(r.nextInt(4));
collector.emit(new Values(line));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
//分片 Bolt
public class SplitBolt implements IRichBolt {
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
Util.sendToClient(this, "prepare()", 8888);
this.context = context;
this.collector = collector;
}
public void execute(Tuple tuple) {
Util.sendToClient(this, "execute()", 8888);
String line = tuple.getString(0);
String[] arr = line.split(" ");
for (String s : arr) {
collector.emit(new Values(s, 1));
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
//计数 Bolt
public class SplitBolt implements IRichBolt {
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
Util.sendToClient(this, "prepare()", 8888);
this.context = context;
this.collector = collector;
}
public void execute(Tuple tuple) {
Util.sendToClient(this, "execute()", 8888);
String line = tuple.getString(0);
String[] arr = line.split(" ");
for (String s : arr) {
collector.emit(new Values(s, 1));
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Shuffle grouping(随机分组)
随机的将tuple分发给bolt的各个task,每个bolt实例接收到相同数量的tuple
public class WCApp {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//设置Spout
//设置线程数(executor)和任务数(task,相当于对象的个数)
builder.setSpout("wcspout", new WordCountSpout(), 3).setNumTasks(3);
//设置creator-Bolt
builder.setBolt("split-bolt", new SplitBolt(), 4).shuffleGrouping("wcspout").setNumTasks(4);
//设置counter-Bolt
builder.setBolt("counter-bolt", new CountBolt(), 5).fieldsGrouping("split-bolt", new Fields("word")).setNumTasks(5);
Config conf = new Config();
//设置工作进程数(worker,启动节点数量)
conf.setNumWorkers(3);
conf.setDebug(true);
/**
* 本地模式storm
*/
// LocalCluster cluster = new LocalCluster();
// cluster.submitTopology("wc", conf, builder.createTopology());
// Thread.sleep(10000);
// cluster.shutdown();
/*
*集群模式
*/
StormSubmitter.submitTopology("wordcount", conf, builder.createTopology());
}
}
Fields grouping(按字段分组)
根据指定的字段的值进行分组。例如:流按照“user-id”进行分组,那么具有相同的“user-id”的tuple会发到同一个task,而具有不同“user-id”值的tuple可能会发到不同的task上。这种情况常常用在单词计数,而实际情况是很少用到,因为如果某个字段的某个值太多,就会导致task不均衡的问题。会出现数据倾斜问题,通过二次聚合避免
//计数 Bolt
//一次聚合和二次聚合使用field分组,完成数据的最终统计
public class CountBolt implements IRichBolt{
private Map<String,Integer> map ;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.context = context;
this.collector = collector;
map = new HashMap<String, Integer>();
//线程安全的工具类
map = Collections.synchronizedMap(map);
//分线程,执行清分工作,线程安全问题
Thread t = new Thread(){
public void run() {
while(true){
emitData();
}
}
};
//守护进程
t.setDaemon(true);
t.start();
}
private void emitData(){
//清分map
synchronized (map){
//判断是否符合清分的条件
for (Map.Entry<String, Integer> entry : map.entrySet()) {
//向下一环节发送数据
collector.emit(new Values(entry.getKey(), entry.getValue()));
}
//清空map
map.clear();
}
//休眠
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void execute(Tuple tuple) {
//提取单词
String word = tuple.getString(0);
Util.sendToLocalhost(this, word);
//提取单词个数
Integer count = tuple.getInteger(1);
if(!map.containsKey(word)){
map.put(word, count);
}
else{
map.put(word,map.get(word) + count);
}
}
public void cleanup() {
for(Map.Entry<String,Integer> entry : map.entrySet()){
System.out.println(entry.getKey() + " : " + entry.getValue());
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public class WCApp {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//设置Spout
builder.setSpout("wcspout", new WordCountSpout()).setNumTasks(2);
//设置creator-Bolt
//split-bolt:随机分组
builder.setBolt("split-bolt", new SplitBolt(),1).shuffleGrouping("wcspout").setNumTasks(1);
//设置counter-Bolt
//第一次聚合:随机分组
builder.setBolt("counter-1", new CountBolt(),1).shuffleGrouping("split-bolt").setNumTasks(1);
//设置counter-Bolt
//第二次聚合:按filed哈希分组
builder.setBolt("counter-2", new CountBolt(),3).fieldsGrouping("counter-1",new Fields("word")).setNumTasks(3);
Config conf = new Config();
conf.setNumWorkers(2);
conf.setDebug(true);
/**
* 本地模式storm
*/
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wc", conf, builder.createTopology());
//Thread.sleep(20000);
// StormSubmitter.submitTopology("wordcount", conf, builder.createTopology());
// cluster.shutdown();
}
}
All grouping(全复制分组)
将所有的tuple都复制之后再分发给Bolt所有的task,每一个订阅数据流的task都会接收到一份相同的完全的tuple的拷贝
public class WCApp {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//设置Spout
builder.setSpout("wcspout", new WordCountSpout()).setNumTasks(2);
//设置creator-Bolt
builder.setBolt("split-bolt", new SplitBolt(), 2).allGrouping("wcspout").setNumTasks(2);
Config conf = new Config();
conf.setNumWorkers(2);
conf.setDebug(true);
/**
* 本地模式storm
*/
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wc", conf, builder.createTopology());
System.out.println("hello world llll");
}
}
Direct grouping(特定指向分组)
给指定的 Bolt 发送
以这种方式分组的流意味着将由元组的生成者决定消费者的哪个task能接收该元组。指向分组只能在已经声明为指向数据流的数据流中声明。tuple的发射必须使用emitDirect中的一种方法。Bolt可以通过使用TopologyContext或通过在OutputCollector(返回元组发送到的taskID)中跟踪emit方法的输出来获取其消费者的taskID
public class WCApp {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//设置Spout
builder.setSpout("wcspout", new WordCountSpout()).setNumTasks(2);
//设置creator-Bolt
builder.setBolt("split-bolt", new SplitBolt(),2).directGrouping("wcspout").setNumTasks(2);
Config conf = new Config();
conf.setNumWorkers(2);
conf.setDebug(true);
/**
* 本地模式storm
*/
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wc", conf, builder.createTopology());
}
}
Global grouping(全局分组)
这种分组会将所有的tuple都发到一个taskid最小的task上。由于所有的tuple都发到唯一一个task上,势必在数据量大的时候会造成资源不够用的情况
None grouping(不分组)
指定分组就表示不关心数据流如何分组。目前来说不分组和随机分组效果是一样的,但是最终,Storm可能会使用与其订阅的bolt或spout在相同进程的bolt来执行这些tuple
Local or shuffle grouping(本地或随机分组)
和随机分组类似,但是如果目标Bolt在同一个工作进程中有一个或多个任务,那么元组将被随机分配到那些进程内task。简而言之就是如果发送者和接受者在同一个worker则会减少网络传输,从而提高整个拓扑的性能。有了此分组就完全可以不用shuffle grouping了
Partial Key grouping(部分字段分组)
流由分组中指定的字段分区,如“字段”分组,但是在两个下游Bolt之间进行负载平衡,当输入数据歪斜时,可以更好地利用资源