目录
一、Storm拓扑的并行度(parallelism)介绍
(1)运行拓扑的结构
工作进程: Worker Process,也称为Worker
执行器: Executor,即线程Thread
任务: Task
工作进程、执行器、任务三者之间关系如下图:
Topology由一个或多个Spout/Bolt组件构成。
运行中的Topology由一个或多个Supervisor节点中的Worker构成。默认情况下一个Supervisor节点运行4个Worker,由defaults.yaml/storm.yaml
中的属性决定:
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
在代码中可以使用new Config().setNumWorkers(3)
,最大数量不能超过配置的supervisor.slots.ports数量。
Executor为特定拓扑的一个或多个组件Spout/Bolt实例运行一个或多个Task。默认情况下一个Executor运行一个Task。
Task执行真正的数据处理,代码中实现的每个Spout/bolt作为很多任务跨集群执行。一个Spout/Bolt组件的Task数量始终贯穿Topology的整个生命周期,但一个Spout/Bolt组件的Executor数量会随着时间而改变。这意味着Threads≤Tasks条件成立。默认情况下Task数量与Executor数量相同,即Storm会使用每个Executor运行一个Task。
(2)配置拓扑的并行度
这里所说的术语“并行度”主要是用于表示所谓的 parallelism_hint,它代表着一个组件的初始 executor (也是线程)数量。在这篇文章里,我们使用这个“并行度”术语来说明在 Storm 拓扑中既可以配置 executor 的数量,也可以配置 worker 和 task 的数量。如果“并行度”的概念需要表示其他的一般情况,我们也会特别指出。
下面的内容里显示了很多可配置选项,以及在代码中配置他们的方法。可以用于配置的方法有很多种,这里列出的只是其中一部分。另外需要注意的是:
Storm 的配置优先级为 defaults.yaml < storm.yaml < 拓扑配置 < 内置型组件信息配置 < 外置型组件信息配置。
(1)工作进程Worker数量
- 说明:拓扑在集群中运行所需要的工作进程数
- 配置选项:TOPOLOGY_WORKERS
- 在代码中使用
Config config = new Config();
//注意此参数不能大于supervisor.slots.ports数量。
config.setNumWorkers(3);
(2)执行器Executor数量
- 说明:每个组件需要的执行线程数
- 配置选项:(没有拓扑级的通用配置项)
- 在代码中使用
TopologyBuilder builder = new TopologyBuilder();
//设置Spout的Executor数量参数parallelism_hint
builder.setSpout(id, spout, parallelism_hint);
//设置Bolt的Executor数量参数parallelism_hint
builder.setBolt(id, bolt, parallelism_hint);
(3)任务Task数量
- 说明:每个组件需要的执行任务数
- 配置选项:TOPOLOGY_TASKS
- 在代码中使用
TopologyBuilder builder = new TopologyBuilder();
//设置Spout的Executor数量参数parallelism_hint,Task数量参数val
builder.setSpout(id, spout, parallelism_hint).setNumTasks(val);
//设置Bolt的Executor数量参数parallelism_hint,Task数量参数val
builder.setBolt(id, bolt, parallelism_hint).setNumTasks(val);
(3)改变运行中拓扑的并行度
Storm一个很好的特性是可以增加或减少工作进程Worker和Executor的数量而不需要重启集群或拓扑,这样的行为成为再平衡(rebalancing)。目前有两种方式可实现拓扑再平衡,如下:
- 使用Storm的WebUI
- 使用Storm的命令行工具,如下
# 重新配置拓扑
# “myTopology” 拓扑使用5个Worker进程
# “blue-spout” Spout使用3个Executor
# “yellow-blot” Bolt使用10个Executor
storm rebalance myTopology -n 5 -e blue-spout=3 -e yellow-blot=10
二、Streaming Groupings流分组策略
数据从上游节点发送到下游节点时,当下游节点的并发度大于1时,我们对下 游节基于多并发情况下接受并处理数据的策略称之为分组策略。
stream grouping就是用来定义一个stream应该如何分配给Bolts上面的多个并发。掌握Shuffle Grouping和 Fields Grouping 即可。
storm里面有6种类型的stream grouping。
1.ShuffleGrouping: 用在非聚合计算,比如过滤、写库等功能性操作 随机派发stream里面的tuple,尽量保证每个bolt并发接收到的tuple数目相同,但不严格相同。0.10之前,shuffleGrouping是轮询分配,即每个bolt得到的数据量相同。
2.FieldsGrouping: 按Field分组进行聚合场景,比如按word来分组, 具有同样word会被分到相同的Bolt。
作用:
- 过滤,从源端(Spout或上一级Bolt)多输出Fields中选择某些Field
- 相同的tuple会分发给同一个Executer或task处理
典型场景: 去重操作、Join
2.Non Grouping: 无分组, 这种分组和Shuffle grouping是一样的效果,多线程下不平均分配。
4.All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。
5.Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
6.Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者决定由消息接收者的哪个task处理这 个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理 者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)
通过Mapreduce来对比FiledsGrouping
SQL:
select count(1), word from tableName group by word;
MR:
<key, value> —— <key, values> 把相同的key进行了聚合 shuffle混淆以后,相同的key发送到同一个reduce进程(线程)里,才能确保该key进行全局聚合。
数据倾斜根本原因:有的key的value少,有的多,两级分化严重。
Storm:
需要确保相同的key(tuple)必须发送给同一个bolt进程(线程),用fiedsGrouping来实现。
三、基于流分组策略应用程序的开发
案例需求:汇总每天的订单的交易量
MainTopology:
package com.kfk.pro1Grouping;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/28
* @time : 4:47 下午
*/
public class MainTopology {
public static void main(String[] args) {
// 创建Topology
TopologyBuilder builder = new TopologyBuilder();
// set Spout
builder.setSpout("spout",new AmtSpout());
// set Bolt,按Field(time)分组,并设置并行度,是这个组件有几个executor来执行
builder.setBolt("amtBolt",new AmtBolt(),4).fieldsGrouping("spout",new Fields("time"));
builder.setBolt("printBolt",new PrintBolt(),2).shuffleGrouping("amtBolt");
// 设置日志等级
Config conf = new Config();
conf.setDebug(false);
try {
// 本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("amtTopo", conf, builder.createTopology());
} catch (Exception e){
e.printStackTrace();
}
}
}
AmtSpout:
package com.kfk.pro1Grouping;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/28
* @time : 4:52 下午
*/
public class AmtSpout implements IRichSpout {
Integer[] amt = {
10,20,30,40};
String[] time = {
"2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"};
String[] city = {
"beijing","nanjing","shenzhen","shanghai","guangzhou"};
String[] product = {
"java","python","c","scala"};
SpoutOutputCollector spoutOutputCollector = null;
Random random = new Random();
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
spoutOutputCollector = collector;
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
// 模拟数据
int _amt = amt[random.nextInt(4)];
String _time = time[random.nextInt(4)];
String _city = city[random.nextInt(5)];
String _product = product[random.nextInt(4)];
// emit给Bolt节点
spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product));
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// set Fields
declarer.declare(new Fields("amt","time","city","product"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
AmtBolt:
package com.kfk.pro1Grouping;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/28
* @time : 4:50 下午
*/
public class AmtBolt implements IRichBolt {
Map<String,Integer> amtMap = new HashMap<String,Integer>();
OutputCollector outputCollector = null;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
outputCollector = collector;
}
@Override
public void execute(Tuple input) {
// Bolt业务逻辑处理
String time = input.getStringByField("time");
int amt = Integer.parseInt(input.getStringByField("amt"));
// 累加amt次数
if (amtMap.get(time) != null){
amt += amtMap.get(time);
}
amtMap.put(time,amt);
// emit给Bolt节点
outputCollector.emit(new Values(amtMap));
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("res"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
PrintBolt:
package com.kfk.pro1Grouping;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/28
* @time : 4:51 下午
*/
public class PrintBolt implements IRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
System.out.println(input.getValue(0));
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
运行结果:
{
2020-12-27 12:43=43471960, 2020-12-23 12:43=43497930}
{
2020-12-27 12:43=43472120, 2020-12-23 12:43=43498060}
{
2020-12-18 12:43=43479050, 2020-12-25 12:43=43458280}
{
2020-12-18 12:43=43480060, 2020-12-25 12:43=43459440}
{
2020-12-18 12:43=43480120, 2020-12-25 12:43=43459470}
{
2020-12-18 12:43=43480280, 2020-12-25 12:43=43459610}
...
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!