Storm模拟分析电话日志

环境:Storm-1.2.2,ubuntu-16.0.4,Idea2018(Linux版),maven-3.3.9

所有的测试,部署都是在Linux系统上进行。

一、知识点介绍

       Storm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。
       Storm集群主要由一个主节点(Nimbus后台程序)和一群工作节点(worker node)Supervisor的节点组成,通过 Zookeeper进行协调。Nimbus类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器, 并且监控状态。
      每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程组成。

        

1、 Nimbus主节点:
     主节点通常运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。
2、Supervisor工作节点:
      工作节点同样会运行一个后台程序 —— Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则通过Zookeeper系统或者集群。
3、Zookeeper
     Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。topology则是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。下面对出现的术语进行更深刻的解析。
4、Worker:
       运行具体处理组件逻辑的进程。
5、Task:
       worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。

      个人理解为worker中运行进程,进程中运行线程,线程中运行任务。一个线程可以执行多个任务。并发度就等于所有的任务数(Task)之和。

        现在用代码解释一下:        

TopologyBuilder builder = new TopologyBuilder();
        //设置Spout
        builder.setSpout("wcspout", new WordCountSpout(),3).setNumTasks(4);
        //设置creator-Bolt
        builder.setBolt("split-bolt", new SplitBolt(),4).shuffleGrouping("wcspout").setNumTasks(5);
        //设置counter-Bolt
        builder.setBolt("counter-bolt", new CountBolt(),5).fieldsGrouping("split-bolt", new Fields("word")).setNumTasks(6);
        Config conf = new Config();
        conf.setNumWorkers(2);

        该代码开启了2个worker进程(worker本身不执行Task(任务),它相当于领导,用于产生executor,让executor去执行任务)。给wcspout分配了3个线程4个任务,给split-bolt分配了4个线程5个任务,给counter-bolt分配了5个线程6个任务。

        进程/线程/任务都是平均的。因此对于上面的代码来说,假如开了1个supervisor,那么这2个worker就都由这一个supervisor监管,假如开了2个supervisor,那么每个supervisor管理一个worker。用图形来表示上述代码的任务分配如下:


    每一个task运行一个对象实例。。

6、Topology(拓扑):
       storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。一个topology是spouts和bolts组成的图, 通过stream groupings将图中的spouts和bolts连接起来,如下图:


一个topology会一直运行直到你手动kill掉,Storm自动重新分配执行失败的任务, 并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。
运行一个topology很简单。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令:
      storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm jar负责连接到Nimbus并且上传jar包。
Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务, 你可以提交由任何语言创建的topology。上面的方面是用JVM-based语言提交的最简单的方法。

7、Spout:
       消息源spout是Storm里面一个topology里面的消息生产者。简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。
       消息源可以发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。
      而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。
       要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。

另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。
8、Bolt:
     Topology中所有的处理都由Bolt完成。即所有的消息处理逻辑被封装在bolts里面。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。
        Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。即需要经过很多blots。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。
        Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。
      而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。
     bolts使用OutputCollector来发射tuple,bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 一般的流程是: bolts处理一个输入tuple,  发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。
9、Tuple:
       一次消息传递的基本单元。本来应该是一个key-value的map,但是由于各个组件间传递的tuple的字段名称已经事先定义好,所以tuple中只要按序填入各个value就行了,所以就是一个value list.
10、Stream:
        源源不断传递的tuple就组成了stream。消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple序列, 而这些tuple序列会以一种分布式的方式并行地创建和处理。通过对stream中tuple序列中每个字段命名来定义stream。在默认的情况下,tuple的字段类型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定义类型(只要实现相应的序列化器)。
     每个消息流在定义的时候会被分配给一个id,因为单向消息流使用的相当普遍, OutputFieldsDeclarer定义了一些方法让你可以定义一个stream而不用指定这个id。在这种情况下这个stream会分配个值为‘default’默认的id 。
      Storm提供的最基本的处理stream的原语是spout和bolt。你可以实现spout和bolt提供的接口来处理你的业务逻辑。
       
11、Stream Groupings:
Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有 Storm提供的6个Stream Grouping类型:
1). 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。
2). 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。
3). 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
4). 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。
5). 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
6). 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。

storm 和hadoop的对比来了解storm中的基本概念。

二、代码


1.CallLogSpout类

该类用于模拟产生数据源

package com.strorm.test;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

/**
 * @Author zhang
 * @Date 18-6-7 下午2:40
 * Spout类,负责产生数据流
 */
public class CallLogSpout implements IRichSpout {

    //Spout输出收集器
    private SpoutOutputCollector collector;
    //是否完成
    private boolean completed=false;
    //上下文
    private TopologyContext context;
    //随机发生器
    private Random randomGenerator = new Random();

    private Integer idx=0;

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.context=topologyContext;
        this.collector=spoutOutputCollector;
    }

    public void close() {

    }

    public void activate() {

    }

    public void deactivate() {

    }

    /**
     * 下一个元组
     */
    public void nextTuple() {
        if (idx<=1000){
            List<String> mobileNumbers=new ArrayList<String>();
            mobileNumbers.add("13901645322");
            mobileNumbers.add("13805376831");
            mobileNumbers.add("13500803713");
            mobileNumbers.add("15988321818");
            Integer localIndex=0;
            while (localIndex++<100 && idx<1000){
                //主叫
                String caller=mobileNumbers.get(randomGenerator.nextInt(4));
                //被叫
                String callee=mobileNumbers.get(randomGenerator.nextInt(4));
                while (caller==callee){
                    //主叫与被叫不能相同,重新赋值被叫
                    callee=mobileNumbers.get(randomGenerator.nextInt(4));
                }
                //模拟通话时间
                Integer callTime=randomGenerator.nextInt(60);
                //输出元组
                this.collector.emit(new Values(caller,callee,callTime));

            }
        }

    }

    public void ack(Object o) {

    }

    /**
     *
     * @param o
     */
    public void fail(Object o) {

    }

    /**
     * 定义输出字段
     * @param outputFieldsDeclarer
     */
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //输出元组中到元素为三个,这也要定义三个字段
        outputFieldsDeclarer.declare(new Fields("from","to","callTime"));
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

2.CallLogBolt类

该类用于处理从Spout类传递过来的源数据,可以实现多个IRichBolt类进行连续处理。

package com.strorm.test;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * @Author zhang
 * @Date 18-6-7 下午3:19
 * 创建Bolt
 */
public class CallLogBolt implements IRichBolt {

    private OutputCollector collector;
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector=outputCollector;
    }

    public void execute(Tuple tuple) {
        //处理通话记录
        String from=tuple.getString(0);
        String to=tuple.getString(1);
        Integer callTime=tuple.getInteger(2);
        collector.emit(new Values(from+" 呼叫 "+to,callTime));
    }

    public void cleanup() {

    }

    /**
     * 定义输出字段
     * @param outputFieldsDeclarer
     */
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("call","callTime"));
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

3.CallCounterBolt类

该类用于统计之前的Bolt类发送过来的数据,其功能类似于Hadoop的Reducer。

package com.strorm.test;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author zhang
 * @Date 18-6-7 下午3:29
 * 计数器,类似于Reducer
 */
public class CallCounterBolt implements IRichBolt {
    Map<String,Integer> counterMap;
    OutputCollector collector;
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.counterMap=new HashMap<String, Integer>();
        collector=outputCollector;
    }

    public void execute(Tuple tuple) {
        String call=tuple.getString(0);
        Integer callTime=tuple.getInteger(1);
        if (!counterMap.containsKey(call)){
            counterMap.put(call,callTime);
        }else {
            Integer integer=counterMap.get(call)+callTime;
            counterMap.put(call,integer);
        }
        collector.ack(tuple);
    }

    public void cleanup() {
        for (Map.Entry<String,Integer> map : counterMap.entrySet()){
            System.out.println(map.getKey()+"   :"+map.getValue()+" 分钟");
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("call"));
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

4.CallLogOutput类

该类用于提交Topology,类似于Hadoop的Job提交。由于Storm不会停止数据流作业,所以在测试环境下要人为停止。这里的Sleep时间可以根据实际情况设定。如果时间过短,可能看不到输出结果。

package com.strorm.test;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * @Author zhang
 * @Date 18-6-7 下午3:46
 */
public class CallLogOutput {

    public static void main(String[] args) throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder topologyBuilder=new TopologyBuilder();
        //设置Spout
        topologyBuilder.setSpout("spout",new CallLogSpout());
        //设置Bolt
        topologyBuilder.setBolt("bolt",new CallLogBolt()).shuffleGrouping("spout");
        //设置counterBolt
        topologyBuilder.setBolt("counterBolt",new CallCounterBolt()).fieldsGrouping("bolt",new Fields("call"));
        Config config=new Config();
        config.setDebug(true);
        LocalCluster localCluster=new LocalCluster();
        localCluster.submitTopology("Log",config,topologyBuilder.createTopology());
        Thread.sleep(20000);
        localCluster.shutdown();
    }
}

5.输出结果


根据代码中的随机产生4个号码。再根据排列组合的

可以知道输出是正确的。

6.集群部署执行

集群部署执行需要将localCluster提交改成StormSubmitter.submitTopology提交。并将相关的Module打包成jar包。

以本文的代码为例,部署时,执行:

storm jar storm-call-core-1.0-SNAPSHOT.jar com.strorm.test.CallLogOutput



在上图中,可以看到提交的Topology名字为Log,就是代码中设定的Topology名字。


拓扑图:


可以看到上图中三个红色圈中拓扑节点的名字就是代码中设定的名字。



然后回到web ui的主页可以看到Topology已经没有数据信息了,表示没有Topology在执行。


标准的执行格式为:

storm jar jar包名  xxx.类名 [arg1] [arg2] [arg3]...

当在集群上执行了jar包后,会在某个supervisor的目录下,产生一个worker.log,这个日志文件里面记录了输出结果,但是该文件一般会非常的大。



源代码下载:点击下载storm电话日志分析源代码

下载源码完成后,只需要以导入maven工程的方式导入root目录下的 pom.xml文件,然后会自动引入所有相关的module。

猜你喜欢

转载自blog.csdn.net/scgh_fx/article/details/80615010