Storm拓扑并行度与及流分组策略

一、Storm拓扑的并行度(parallelism)介绍

(1)运行拓扑的结构

工作进程: Worker Process,也称为Worker
执行器: Executor,即线程Thread
任务: Task

工作进程、执行器、任务三者之间关系如下图:
在这里插入图片描述
Topology由一个或多个Spout/Bolt组件构成。

运行中的Topology由一个或多个Supervisor节点中的Worker构成。默认情况下一个Supervisor节点运行4个Worker,由defaults.yaml/storm.yaml中的属性决定:

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

在代码中可以使用new Config().setNumWorkers(3),最大数量不能超过配置的supervisor.slots.ports数量。

Executor为特定拓扑的一个或多个组件Spout/Bolt实例运行一个或多个Task。默认情况下一个Executor运行一个Task。

Task执行真正的数据处理,代码中实现的每个Spout/bolt作为很多任务跨集群执行。一个Spout/Bolt组件的Task数量始终贯穿Topology的整个生命周期,但一个Spout/Bolt组件的Executor数量会随着时间而改变。这意味着Threads≤Tasks条件成立。默认情况下Task数量与Executor数量相同,即Storm会使用每个Executor运行一个Task。

(2)配置拓扑的并行度

这里所说的术语“并行度”主要是用于表示所谓的 parallelism_hint,它代表着一个组件的初始 executor (也是线程)数量。在这篇文章里,我们使用这个“并行度”术语来说明在 Storm 拓扑中既可以配置 executor 的数量,也可以配置 worker 和 task 的数量。如果“并行度”的概念需要表示其他的一般情况,我们也会特别指出。

下面的内容里显示了很多可配置选项,以及在代码中配置他们的方法。可以用于配置的方法有很多种,这里列出的只是其中一部分。另外需要注意的是:

Storm 的配置优先级为 defaults.yaml < storm.yaml < 拓扑配置 < 内置型组件信息配置 < 外置型组件信息配置。

扫描二维码关注公众号,回复: 12644552 查看本文章

(1)工作进程Worker数量

  • 说明:拓扑在集群中运行所需要的工作进程数
  • 配置选项:TOPOLOGY_WORKERS
  • 在代码中使用
Config config = new Config();
//注意此参数不能大于supervisor.slots.ports数量。
config.setNumWorkers(3);    

(2)执行器Executor数量

  • 说明:每个组件需要的执行线程数
  • 配置选项:(没有拓扑级的通用配置项)
  • 在代码中使用
TopologyBuilder builder = new TopologyBuilder();
//设置Spout的Executor数量参数parallelism_hint
builder.setSpout(id, spout, parallelism_hint);
//设置Bolt的Executor数量参数parallelism_hint        
builder.setBolt(id, bolt, parallelism_hint);        

(3)任务Task数量

  • 说明:每个组件需要的执行任务数
  • 配置选项:TOPOLOGY_TASKS
  • 在代码中使用
TopologyBuilder builder = new TopologyBuilder();
//设置Spout的Executor数量参数parallelism_hint,Task数量参数val
builder.setSpout(id, spout, parallelism_hint).setNumTasks(val);  
//设置Bolt的Executor数量参数parallelism_hint,Task数量参数val     
builder.setBolt(id, bolt, parallelism_hint).setNumTasks(val);            

(3)改变运行中拓扑的并行度

Storm一个很好的特性是可以增加或减少工作进程Worker和Executor的数量而不需要重启集群或拓扑,这样的行为成为再平衡(rebalancing)。目前有两种方式可实现拓扑再平衡,如下:

  • 使用Storm的WebUI
  • 使用Storm的命令行工具,如下
# 重新配置拓扑
# “myTopology” 拓扑使用5个Worker进程
# “blue-spout” Spout使用3个Executor
# “yellow-blot” Bolt使用10个Executor
storm rebalance myTopology -n 5 -e blue-spout=3 -e yellow-blot=10

二、Streaming Groupings流分组策略

数据从上游节点发送到下游节点时,当下游节点的并发度大于1时,我们对下 游节基于多并发情况下接受并处理数据的策略称之为分组策略。
在这里插入图片描述
stream grouping就是用来定义一个stream应该如何分配给Bolts上面的多个并发。掌握Shuffle Grouping和 Fields Grouping 即可。

storm里面有6种类型的stream grouping。

1.ShuffleGrouping: 用在非聚合计算,比如过滤、写库等功能性操作 随机派发stream里面的tuple,尽量保证每个bolt并发接收到的tuple数目相同,但不严格相同。0.10之前,shuffleGrouping是轮询分配,即每个bolt得到的数据量相同。

2.FieldsGrouping: 按Field分组进行聚合场景,比如按word来分组, 具有同样word会被分到相同的Bolt。

作用:

  • 过滤,从源端(Spout或上一级Bolt)多输出Fields中选择某些Field
  • 相同的tuple会分发给同一个Executer或task处理

典型场景: 去重操作、Join

2.Non Grouping: 无分组, 这种分组和Shuffle grouping是一样的效果,多线程下不平均分配。

4.All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。

5.Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

6.Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者决定由消息接收者的哪个task处理这 个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理 者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)

通过Mapreduce来对比FiledsGrouping
SQL:

select count(1), word from tableName group by word;

MR:
<key, value> —— <key, values> 把相同的key进行了聚合 shuffle混淆以后,相同的key发送到同一个reduce进程(线程)里,才能确保该key进行全局聚合。
数据倾斜根本原因:有的key的value少,有的多,两级分化严重。

Storm:
需要确保相同的key(tuple)必须发送给同一个bolt进程(线程),用fiedsGrouping来实现。

三、基于流分组策略应用程序的开发

案例需求:汇总每天的订单的交易量

MainTopology:

package com.kfk.pro1Grouping;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * @author : 蔡政洁
 * @email :[email protected]
 * @date : 2020/12/28
 * @time : 4:47 下午
 */
public class MainTopology {
    
    
    public static void main(String[] args) {
    
    

        // 创建Topology
        TopologyBuilder builder = new TopologyBuilder();

        // set Spout
        builder.setSpout("spout",new AmtSpout());
        // set Bolt,按Field(time)分组,并设置并行度,是这个组件有几个executor来执行
        builder.setBolt("amtBolt",new AmtBolt(),4).fieldsGrouping("spout",new Fields("time"));
        builder.setBolt("printBolt",new PrintBolt(),2).shuffleGrouping("amtBolt");

        // 设置日志等级
        Config conf = new Config();
        conf.setDebug(false);

        try {
    
    
            // 本地模式运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("amtTopo", conf, builder.createTopology());
        } catch (Exception e){
    
    
            e.printStackTrace();
        }
    }
}

AmtSpout:

package com.kfk.pro1Grouping;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.Random;

/**
 * @author : 蔡政洁
 * @email :[email protected]
 * @date : 2020/12/28
 * @time : 4:52 下午
 */
public class AmtSpout implements IRichSpout {
    
    

    Integer[] amt = {
    
    10,20,30,40};
    String[] time = {
    
    "2020-12-27 12:43","2020-12-25 12:43","2020-12-23 12:43","2020-12-18 12:43"};
    String[] city = {
    
    "beijing","nanjing","shenzhen","shanghai","guangzhou"};
    String[] product = {
    
    "java","python","c","scala"};

    SpoutOutputCollector spoutOutputCollector = null;

    Random random = new Random();

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    
    

        spoutOutputCollector = collector;
    }

    @Override
    public void close() {
    
    

    }

    @Override
    public void activate() {
    
    

    }

    @Override
    public void deactivate() {
    
    

    }

    @Override
    public void nextTuple() {
    
    

        // 模拟数据
        int _amt = amt[random.nextInt(4)];
        String _time = time[random.nextInt(4)];
        String _city = city[random.nextInt(5)];
        String _product = product[random.nextInt(4)];

        // emit给Bolt节点
        spoutOutputCollector.emit(new Values(String.valueOf(_amt),_time,_city,_product));

    }

    @Override
    public void ack(Object msgId) {
    
    

    }

    @Override
    public void fail(Object msgId) {
    
    

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
    

        // set Fields
        declarer.declare(new Fields("amt","time","city","product"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
    
    
        return null;
    }
}

AmtBolt:

package com.kfk.pro1Grouping;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * @author : 蔡政洁
 * @email :[email protected]
 * @date : 2020/12/28
 * @time : 4:50 下午
 */
public class AmtBolt implements IRichBolt {
    
    


    Map<String,Integer> amtMap = new HashMap<String,Integer>();

    OutputCollector outputCollector = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
    
        outputCollector = collector;
    }

    @Override
    public void execute(Tuple input) {
    
    

        // Bolt业务逻辑处理
        String time = input.getStringByField("time");
        int amt = Integer.parseInt(input.getStringByField("amt"));

        // 累加amt次数
        if (amtMap.get(time) != null){
    
    
            amt += amtMap.get(time);
        }
        amtMap.put(time,amt);

        // emit给Bolt节点
        outputCollector.emit(new Values(amtMap));
    }

    @Override
    public void cleanup() {
    
    

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
    
        declarer.declare(new Fields("res"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
    
    
        return null;
    }
}

PrintBolt:

package com.kfk.pro1Grouping;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

import java.util.Map;

/**
 * @author : 蔡政洁
 * @email :[email protected]
 * @date : 2020/12/28
 * @time : 4:51 下午
 */
public class PrintBolt implements IRichBolt {
    
    
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
    

    }

    @Override
    public void execute(Tuple input) {
    
    
        System.out.println(input.getValue(0));

    }

    @Override
    public void cleanup() {
    
    

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
    

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
    
    
        return null;
    }
}

运行结果:

{
    
    2020-12-27 12:43=43471960, 2020-12-23 12:43=43497930}
{
    
    2020-12-27 12:43=43472120, 2020-12-23 12:43=43498060}
{
    
    2020-12-18 12:43=43479050, 2020-12-25 12:43=43458280}
{
    
    2020-12-18 12:43=43480060, 2020-12-25 12:43=43459440}
{
    
    2020-12-18 12:43=43480120, 2020-12-25 12:43=43459470}
{
    
    2020-12-18 12:43=43480280, 2020-12-25 12:43=43459610}
...

以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

猜你喜欢

转载自blog.csdn.net/weixin_45366499/article/details/111876033