*storm的开发
开发spout类(定义一个类型实现BaseRichSpout抽象类,然后实现其nextTuple方法和open方法)
开发bolt类(定一个类型实现BaseRichBolt抽象类,然后实现其execute方法,declareOutputFields方法)
把spout类和bolt类组装在一块形成一个topology(使用TopologyBuilder来创建topology对象)
把topology发布storm引擎来进行分布式的流处理
StormSubmitter.submitTopology("mytopology", conf, builder.createTopology())
LocalCluster.submitTopology(...)
启动后storm是持续运行的。
启动主节点
storm nimbus>>/tmp/storm_nimbus.log &
storm ui>>/tmp/storm_ui.log &
storm supervisor>>/tmp/storm_supervisor.log &
子节点:
storm supervisor>>/tmp/storm_supervisor.log &
*发布程序到storm集群上运行
1.确保程序中发布任务使用的是StormSubmitter类:
StormSubmitter.submitTopology("wordCount", config, builder.createTopology());
2.打包,把项目打成jar包,放到linux上
3.使用指令把流计算发布到集群上:
storm jar stormtest20.jar com.zhiyou.bd20.storm.WordCount
*Stream groupings(bolt和bolt之间的数据交互方式)
1.Shuffle grouping
随机的把源bolt中数据均衡的发布到目标bolt中
2.Fields grouping
根据源bolt中数据(Tuple)的某个字段作为散布到目标bolt的依据。
这个字段相同的数据(Tuple)一定会发送到同一个目标的bolt中
3.Partial Key grouping
根据某个key值对数据从源bolt中散布到目标bolt中,和Fields grouping一样,不同点在于,这种方式当面临到数据倾斜问题时,他会把倾斜的key再进行散布,如果需要对数据进行聚合,需要多一次聚合bolt操作。
4.All grouping
源bolt中的所有数据会以副本的形式进入每一个目标bolt
每一个目标bolt中接受到的数据量是所有源bolt数据的总量
5.Global grouping
所有源bolt得劲数据都进入一个目标bolt中,进入目标bolt的task任务id最小的那个bolt中
6.None grouping
当不在意数据在bolt和bolt之间的传输过程时可以指定None grouping。散布的效果和Shuffle grouping,过程中输出的消息队列中数据生成和数据消费用的是同一个线程。
7.Direct grouping
输入的数据和输出的数据一样,direct不对数据做改变。
8.Local or shuffle grouping
如果有目标的bolt和源的bolt在同一个物理节点上,则优先这两个bolt对接,源bolt的数据不再发送到其他的目标bolt上
*Trident
它封装了bolt,然后把bolt和spout封装成方法,通过接受算子的形式完成对数据的流计算。
它把流数据从一行数据触发一次流计算过程,封装成多行数据变成一个小批次,每一个批次触发一次流计算,从而trident是一种微批次处理的形式的流处理。