Storm
- 第一章 是什么
- 第二章 Storm编程案例
- 第三章 Storm Grouping
- 1. Shuffle Grouping
- 2. Fields Grouping
- 3. All Grouping
- 4. Global Grouping
- 5. None Grouping
- 6. Direct Grouping
- 7. Local or shuffle grouping
- 8. customGrouping
- 第四章 安装
- 第五章 深入理解Storm
- 第六章 Flume-Kafka-Storm整合案例实现
第一章 是什么
一 介绍
Storm是Twitter开源的分布式实时大数据处理框架,最早开源于github. 2013年,Storm进入Apache社区进行孵化. 2014年9月,晋级成为了Apache顶级项目. 国内外各大网站使用,例如雅虎、阿里、百度
官网 http://storm.apache.org/
特点
-
Storm是个实时的、分布式以及具备高容错的计算系统
-
Storm进程常驻内存
-
Storm数据不经过磁盘,在内存中处理
-
高可靠性
异常处理
消息可靠性保障机制(ACK) -
可维护性
StormUI 图形化监控接口
注意:
- MapReduce无法做到实时处理, 制约因素是数据量级大, 分布式计算, IO操作(浪费时间)
- 分布式能够解决单点故障
二 拓扑流程
组件说明
- spout : 相当于数据源
- tuple : 相当于元数据
- bolt : 数据处理的最小单位, 只负责处理一部分处理逻辑, bolt异步多线程处理, 最后再汇总
拓扑图
架构
详细说明见第四章第一节
- Nimbus: 资源分配,任务调度, 上传jar ( 类比老板 )
- Supervisor : 开启或进程 ( 类比包工头,根据ZK分配信息决定 )
- Worker: 位于Supervisor节点上, 而且可以有多个, 每个Worker可以接一个或多个任务(Task),根据自身的能力和业务复杂度处理{ Task: 包括 bolt(逻辑单元处理) 和spout(推送数据) }
数据结构
-
ZMQ(twitter早期产品)
ZeroMQ 开源的消息传递框架,并不是一个MessageQueue -
Storm使用Netty进行传输, Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)
流式处理
-
流式处理(异步 与 同步)
客户端提交数据进行结算,并不会等待数据计算结果 -
逐条处理
例:ETL(数据清洗)extracted transform load -
统计分析
例:计算PV、UV、访问热点 以及某些数据的聚合、加和、平均等
客户端提交数据之后,计算完成结果存储到Redis、HBase、MySQL或者其他MQ当中,
客户端并不关心最终结果是多少。
有向无环图(DAG,directed acyclic graph): 起始点一定是spout, 终点一定是 bolt, 拓扑有方向, 如下图
实时处理
-
实时请求应答服务(同步)
客户端提交数据请求之后,立刻取得计算结果并返回给客户端 -
Drpc: distributed remote procedure call, 分布式远程过程/服务调用.
-
实时请求处理
例:图片特征提取
三 性能对比
Storm 与MapReduce的关系
- Storm:进程、线程常驻内存运行,数据不进入磁盘,数据通过网络传递。
- MapReduce:为TB、PB级别数据设计的批处理计算框架。
Storm 与 Spark Streaming 的关系
-
Storm:纯流式处理
专门为流式处理设计
数据传输模式更为简单,很多地方也更为高效
并不是不能做批处理,它也可以来做微批处理,来提高吞吐 -
Spark Streaming:微批处理
将RDD做的很小来用小的批处理来接近流式处理
基于内存和DAG可以把处理任务做的很快
四 计算模型
1.Topology(译为拓扑结构) – DAG有向无环图的实现
- 对于Storm实时计算逻辑的封装. 即,由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构
- 生命周期:此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止
(区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止)
2.Tuple – 元组
- Stream中最小数据组成单元
3.Stream – 数据流
- 从Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做Stream
- Stream声明时需给其指定一个Id(默认为Default)
- 实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId
4.Spout – 数据源
-
拓扑中数据流的来源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中
-
一个Spout可以发送多个数据流(Stream)
可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去 -
Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit方法将数据生成元组(Tuple)发送给之后的Bolt计算
5.Bolt – 数据流处理组件
-
拓扑中数据处理均有Bolt完成。对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往需要多个Bolt分多个步骤完成
-
一个Bolt可以发送多个数据流(Stream)
可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去 -
Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑
6.Stream Grouping – 数据流分组(即数据分发策略)
注意: 1,4,5,6 在Storm开发中经常用到
第二章 Storm编程案例
环境准备, 案例用到的jar在底部分享, 下载后在项目下创建一个lib目录, 然后右击bulild path全部即可
一 WordSum ( 数据累加 )
Spout
用于数据的推送
这里是将每个i 的值推送给 bolt 进行处理
package ah.szxy.storm.bolt;
import java.util.List;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
/**
* 手动继承 BaseRichSpout, 实现它的未实现方法
* @author chy
*/
public class WsSpout extends BaseRichSpout{
private Map map;
private TopologyContext context;
private SpoutOutputCollector collector;
int i=0;//nextTuple方法会被循环调用,因此i应该是成员变量
/**
* 1.配置初始化spout
* 将局部变量赋值给成员变量, 目的是提升局部变量的作用域
*/
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
this.map=map;
this.context=context;
this.collector=collector;
}
/**
* 2.采集并且向后推送数据
*/
@Override
public void nextTuple() {
/**
* 这里体现了面向接口的核心思想
* 如果声明直接使用Values, 接收数据的类型就会被限制死了
*/
List list = new Values(i++);
this.collector.emit(list);
System.err.println("num==========="+list);
Utils.sleep(1000);//和线程休眠效果一样,storm包提供
}
/**
* 3.向接收的数据的逻辑处理单元声明发送数据的字段名称
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("num"));
}
}
Bolt
用于对spout的数据进行逻辑处理
这里是对数据进行求和
package ah.szxy.storm.spout;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
/**
* 继承BaseRichBolt, 实现相关方法
* @author chy
*
*/
public class WsBolt extends BaseRichBolt{
//成员变量
private Map stormConf;
private TopologyContext comtext;
private OutputCollector collector;
//求和
int sum=0;
/**
* 准备阶段(提供逻辑运算的环境)
* 将局部变量赋值给成员变量, 目的是提升局部变量的作用域
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.stormConf=stormConf;
this.comtext=context;
this.collector=collector;
}
/**
* 获取数据 ( 有必要的话, 向后继续发送数据 )
*/
@Override
public void execute(Tuple input) {
// input.getInteger(0);
int num=input.getIntegerByField("num");//接收的是spout类中declareOutputFields方法声明的字段名称
sum+=num;
System.err.println("sum========================="+sum);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
Test
构建拓扑结构模型
测试程序是否正常运行
package ah.szxy.storm.test;
import ah.szxy.storm.bolt.WsSpout;
import ah.szxy.storm.spout.WsBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class TestWs {
/**
* 建立拓扑结构, 加入集群中运行
* @param args 命令行参数
*/
public static void main(String[] args) {
//构建storm拓扑结构( Topology: 拓扑结构)
TopologyBuilder tb=new TopologyBuilder();
tb.setSpout("wsspout", new WsSpout());
tb.setBolt("wsbolt", new WsBolt()).shuffleGrouping("wsspout");
//创建本地storm集群
LocalCluster lc=new LocalCluster();
//将任务布置到集群中运行
lc.submitTopology("wordsum", new Config(), tb.createTopology());
}
}
注意:
- 由结果可以看出, 执行一次spout就会执行一次bolt操作
- 而且他们顺序有时候会颠倒, 原因是他们执行的是异步nio(多线程并行,谁快谁先执行)操作而不是串行操作, 但是最后的结果不会受到影响
二 WordCount
Spout
需要注意的是这里采取了随机的方式推送数据
因此下面在结果打印时, 打印的数据可能相同
/**
* spout数据推送
* @author chy
*
*/
public class WcSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
//定义需要被统计字符串数据
String[] text= {
"I am a walker",
"I like play computer and comic",
"I like study and sing",
"My nickname is TimePause",
"TimePause is not simple history"
};
//定义一个随机数变量r
Random r=new Random();
/**
* 初始化方法
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.conf=conf;
this.context=context;
this.collector=collector;
}
/**
* 采集并向后推送数据
*/
@Override
public void nextTuple() {
//从数组中随机取出一行,放到list集合中
List line=new Values(text[r.nextInt(text.length)]);
//推送数据
this.collector.emit(line);
System.err.println("spout emit line========"+line);
Utils.sleep(1000);
}
/**
* 向接收的数据的逻辑处理单元声明发送数据的字段名称
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
Bolt
/**
* 第一个Bolt---进行分词
* @author chy
*
*/
public class WcSplitBolt extends BaseRichBolt{
Map stormConf;
TopologyContext context;
OutputCollector collector;
/**
* 准备阶段(提供逻辑运算的环境)
* 将局部变量赋值给成员变量, 目的是提升局部变量的作用域
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.stormConf=stormConf;
this.context=context;
this.collector=collector;
}
/**
* 获取tuple元祖中每一行数据并切割
* @param input
*/
@Override
public void execute(Tuple input) {
//input.getString(0);//通过偏移量获取
String line=input.getStringByField("line");
//切割
String[] words = line.split(" ");
for (String word : words) {
List wordList=new Values(word);
this.collector.emit(wordList);//发送数据
}
}
/**
* 向接收的数据的逻辑处理单元声明发送数据的字段名称
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordList"));
}
}
/**
* 第二个Bolt---分词后的统计
* @author chy
*
*/
public class WcCountBolt extends BaseRichBolt{
//用来存放,单词,以及单词出现的个数
Map<String, Integer> map=new HashMap<String, Integer>();
/**
* 准备阶段(提供逻辑运算的环境)
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
/**
* 获取tuple元祖中每一个单词, 并且按照单词统计出出现的次数
* @param input
*/
@Override
public void execute(Tuple input) {
String word=input.getStringByField("wordList");//到这里获取的方式时一个一个的获取
//存放单词数量,之所以不设置为全局是因为每次key的值都不一样
int count=1;
if (map.containsKey(word)) {//如果出现,则count+1
count=(int)map.get(word)+1;//map.get(key)获取map的值
}
map.put(word, count);
System.err.println("WcCountBolt emit===key:"+word+"==count:"+count);
}
/**
* 向接收的数据的逻辑处理单元声明发送数据的字段名称
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
Test
/**
* 测试类
* @author chy
*
*/
public class TestWc {
public static void main(String[] args) {
//创建拓扑结构
TopologyBuilder tb=new TopologyBuilder();
tb.setSpout("WcSpout", new WcSpout());
tb.setBolt("WcSplitBolt",new WcSplitBolt()).shuffleGrouping("WcSpout");
//fieldsGrouping:根据单词属性名称进行分组
tb.setBolt("WcCountBolt", new WcCountBolt(), 3).fieldsGrouping("WcSplitBolt", new Fields("wordList"));
//创建本地集群
LocalCluster lc=new LocalCluster();
//发布任务到集群
lc.submitTopology("WordCount", new Config(), tb.createTopology());
}
}
结果展示
因为spout采取随机推送, 因此数据重复的可能性非常大
第三章 Storm Grouping
由上面两个案例的test方法中我们可以看到Storm Grouping的作用,下面我们来具体学习一下它吧~~~
1. Shuffle Grouping
随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
轮询,平均分配
2. Fields Grouping
按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。
3. All Grouping
广播发送,对于每一个tuple,所有的bolts都会收到
4. Global Grouping
全局分组,把tuple分配给task id最低的task 。
5. None Grouping
不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。
6. Direct Grouping
指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)
7. Local or shuffle grouping
本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
8. customGrouping
自定义,相当于mapreduce 自己去实现一个partition一样。
第四章 安装
伪分布式搭建
单一节点安装, 但是具备分布式所具备的所有组件
## 单机模式
## 上传解压,资料分享至末尾
$ tar xf apache-storm-0.10.0.tar.gz
$ cd apache-storm-0.10.0
$ storm安装目录下创建log: mkdir logs
$ ./bin/storm --help
下面分别启动ZooKeeper、Nimbus、UI、supervisor、logviewer
##错误信息放到标准输入中,
$ ./bin/storm dev-zookeeper >> ./logs/zk.out 2>&1 &
$ ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &
$ ./bin/storm ui >> ./logs/ui.out 2>&1 &
$ ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &
$ ./bin/storm logviewer >> ./logs/logviewer.out 2>&1 &
# 需要等一会儿
$ jps
6966 Jps
6684 logviewer
6680 dev_zookeeper
6681 nimbus
6682 core
6683 supervisor
# 访问图形化界面( 图1 )
http://nodex:8080
# 提交任务到Storm集群当中运行:
## 首先将WrodCount程序打包成 WrodCount.jar 放到/root/chy/software ,需要阅读下方的注意事项
## 在Strom根目录下运如下命令 ./bin/storm jar jar全路径 主类/启动类的全路径( 图2 )
./bin/storm jar /root/chy/software/WrodCount.jar ah.szxy.storm.tesTestWc wc
注意: 在将项目打包放到伪分布式环境中时, 修改了主类如下的代码, 使其能够依靠集群环境下运行
//提交任务,输入了额外的参数,在集群环境下运行;否则在项目自身的环境下运行
Config config = new Config();
if (args.length>0) {
try {
StormSubmitter.submitTopology(args[0], config, tb.createTopology());
} catch (Exception e) {
}
}else {
//创建本地集群
LocalCluster lc=new LocalCluster();
//发布任务到集群
lc.submitTopology("WordCount", config, tb.createTopology());
}
图1
图2
完全分布式搭建
环境要求
java -version
JDK 1.6+
python -V (系统内置)
Python 2.6.6+
ZooKeeper3.4.5+
storm 0.9.4+
各节点分配情况 | Nimbus | Supervisor | Zookeeper |
---|---|---|---|
node2 | * | * | |
node3 | * | * | |
node4 | * | * |
具体步骤
思路: 首先在node2配置storm, 配置完成后分发给node3,node4
node1作为nimbus,
# 1. 开始配置storm.yaml
$ vim conf/storm.yaml
--------------------------------------
storm.zookeeper.servers:
- "node2"
- "node3"
- "node4"
# 任务的存储目录
storm.local.dir: "/tmp/storm"
# 声明主节点在哪里
nimbus.host: "node2"
# 指定从节点的槽位,一个从节点对应四个槽位,一个槽位对应一个worker,一个worker对应一个端口
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
-------------------------------
# 2.在storm目录中创建logs目录
$ mkdir logs
# 3. (分发)集群其他服务器node3,node4
## 启动ZooKeeper集群(node2,3,4)
zkServer.sh start
# 4. node1上启动Nimbus
## 2>&1的意思就是将标准错误重定向到标准输出, & 为后台输出
$ ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &
$ tail -f logs/nimbus.log
$ ./bin/storm ui >> ./logs/ui.out 2>&1 &
$ tail -f logs/ui.log
# 5. 节点node2和node3启动supervisor,按照配置,每启动一个supervisor就有了4个slots
$ ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &
$ tail -f logs/supervisor.log (当然node1也可以启动supervisor)
# 6.访问图形化界面(图1),至此安装完成
http://node2:8080/
# 集群测试
## 上传jar任务到Storm集群当中运行(可以从Supervisor节点提交,但是会汇总到nimbus的/tmp/storm目录下, 图2,图3):
$ ./bin/storm jar /root/chy/software/WrodCount2.jar ah.szxy.storm.test.TestWc wc
## 观察关闭一个supervisor后,nimbus的重新调度
## 再次启动一个新的supervisor后,观察,并rebalance, 可以通过图形化页面来操作
注意: 在打包前, 修改了主类的相关代码 , 设置了相关的进程和线程数, 以及worker的数目
public class TestWc {
/**
* 建立拓扑结构, 加入集群中运行
* @param args 命令行参数
*/
public static void main(String[] args) {
//创建拓扑结构
TopologyBuilder tb=new TopologyBuilder();
tb.setSpout("WcSpout", new WcSpout(),2);
tb.setBolt("WcSplitBolt",new WcSplitBolt(),4).shuffleGrouping("WcSpout");
//fieldsGrouping:根据单词属性名称进行分组
tb.setBolt("WcCountBolt", new WcCountBolt(),2).setNumTasks(4).fieldsGrouping("WcSplitBolt", new Fields("wordList"));
//提交任务,输入了额外的参数,在集群环境下运行;否则在项目自身的环境下运行
Config config = new Config();
config.setNumWorkers(2);
if (args.length>0) {
try {
StormSubmitter.submitTopology(args[0], config, tb.createTopology());
} catch (Exception e) {
}
}else {
//创建本地集群
LocalCluster lc=new LocalCluster();
//发布任务到集群
lc.submitTopology("WordCount", config, tb.createTopology());
}
}
}
图1
图2
图3
第五章 深入理解Storm
一 Strom架构
Storm架构组件介绍
-
Nimbus
资源调度
任务分配
接收jar包 -
Supervisor
接收nimbus分配的任务
启停自己管理的worker进程(当前supervisor上worker数量由配置文件设定) -
Worker
运行具体处理运算组件的进程(每个Worker对应执行一个Topology的子集)
worker任务类型,即spout任务、bolt任务两种
启动executor(executor即worker, JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务) -
Zookeeper
健康检查( 心跳检测 )
程序协调( 主备切换 )
Storm与Hadoop结构区别
Storm任务流程
Storm本地目录树
Storm Zookeeper目录树
二 Storm集群的并发机制
Worker – 进程
-
一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)
-
这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中多台服务器上的进程所组成
Executor – 线程
-
Executor是由Worker进程中生成的一个线程
-
每个Worker进程中会运行拓扑当中的一个或多个Executor线程
-
一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些Task任务都是对应着同一个组件(Spout、Bolt)。
Task
-
实际执行数据处理的最小单元
-
每个task即为一个Spout或者一个Bolt
-
Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整
(默认情况下,Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务)
代码实现
设置Worker进程数
Config.setNumWorkers(int workers)
设置Executor线程数
TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)
TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)
:其中, parallelism_hint即为executor线程数
设置Task数量
ComponentConfigurationDeclarer.setNumTasks(Number val)
例:
Config conf = new Config() ;
conf.setNumWorkers(2);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new MySpout(), 1);
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout);
复杂情况下的配置图与代码截图
该图5进程6任务的原因是: 有一个进程分配了两个任务(GreenBolt)
配置图
代码截图
因为有两个worker, 因此进程数是原来的两倍, 可知原来进程为5个
动态调整Worker进程数量、以及Executor线程数量
Rebalance – 再平衡
即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量
支持两种调整方式:
1、通过Storm UI
2、通过Storm CLI
通过Storm CLI动态调整:
例:
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
可以通过 help rebalance
将mytopology拓扑worker进程数量调整为5个
“ blue-spout ” 所使用的线程数量调整为3个
“ yellow-bolt ”所使用的线程数量调整为10个
三 Storm通信机制
Worker进程间的数据通信
- ZMQ
ZeroMQ 开源的消息传递框架,并不是一个MessageQueue - Netty
Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)
Worker内部的数据通信
- Disruptor
实现了“队列”的功能。
可以理解为一种事件监听或者消息处理机制,即在队列当中一边由生产者放入消息数据,另一边消费者并行取出消息数据处理。
四 Storm容错机制
1、集群节点宕机
- Nimbus服务器
单点故障?重启(极小概率出现, 因为自身基于Netty和队列机制) - 非Nimbus服务器
故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行
2、进程挂掉
-
Worker
挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上 -
Supervisor
无状态(所有的状态信息都存放在Zookeeper中来管理)
快速失败(每当遇到任何异常情况,都会自动毁灭) -
Nimbus
无状态(所有的状态信息都存放在Zookeeper中来管理)
快速失败(每当遇到任何异常情况,都会自动毁灭)
3、消息的完整性
-
从Spout中发出的Tuple,以及基于他所产生Tuple, 由这些消息就构成了一棵tuple树
-
当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“完整处理”,即消息的完整性
Acker – 消息完整性的实现机制
- Storm的拓扑当中特殊的一些任务
- 负责跟踪每个Spout发出的Tuple的DAG(有向无环图)
五 Storm Drpc
-
DRPC (Distributed RPC) remote procedure call :分布式远程过程调用
-
DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
-
DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。
(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)
DRPC设计目的:
为了充分利用Storm的计算能力实现高密度的并行实时计算。
(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)
Drpc 流程介绍
-
客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。
实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。 -
DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。
定义DRPC拓扑:
-
方法1:
通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)
该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现 -
方法2:
直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑
需要手动设定好开始的DRPCSpout以及结束的ReturnResults
运行模式
1、本地模式
2、远程模式(集群模式)
# 1. 修改配置文件conf/storm.yaml(指定为当前主节点nimbus即可)
----------将该更改分发到集群的其他节点-----------------
drpc.servers:
- "node2“
----------------------------------------------------
# 2. 启动DRPC Server
bin/storm drpc &
# 3. 通过StormSubmitter.submitTopology提交拓扑
六 Strom 事务机制
事务性拓扑(Transactional Topologies):保证消息(tuple)被且仅被处理一次
官网介绍
Design 1
强顺序流(强有序)
- 引入事务(transaction)的概念,每个transaction(即每个tuple)关联一个transaction id。
- Transaction id从1开始,每个tuple会按照顺序+1。
- 在处理tuple时,将处理成功的tuple结果以及transaction id同时写入数据库中进行存储。
两种情况:
1、当前transaction id与数据库中的transaction id不一致( 表示新的事务, 往里面存)
2、两个transaction id相同( 覆盖或者让新的变量指向原来的数据库)
缺点:
一次只能处理一个tuple,无法实现分布式计算
Design 2
强顺序的Batch流
- 事务(transaction)以batch为单位,即把一批tuple称为一个batch,每次处理一个batch。
- 每个batch(一批tuple)关联一个transaction id
- 每个batch内部可以并行计算
Design 3 Storm’s design
一个关键的认识是,并非所有处理批处理元组的工作都需要有序地进行。例如,在计算全局计数时,计算分为两个部分:
- 计算批次的部分计数
- 使用部分计数更新数据库中的全局计数
#2的计算需要在批之间进行严格排序,但是没有理由您不应该通过为多个批并行计算#1 来流水线化批的计算。因此,当批次1正在更新数据库时,批次2至10可以计算其部分计数。
Storm通过将批处理的计算分为两个阶段来实现这一区别:
- 处理阶段:这是可以并行完成批处理的阶段
- 提交阶段:批处理的提交阶段是有序的。因此,直到成功完成批次1的提交后,批次2的提交才完成。
这两个阶段一起称为“交易”。在给定的时刻,许多批次可以处于处理阶段,但是只有一个批次可以处于提交阶段。如果批处理或提交阶段发生任何故障,则将重播整个事务(两个阶段)。
Design details(设计细节)
-
Manages state - 状态管理
Storm通过Zookeeper存储所有transaction相关信息(包含了:当前transaction id 以及batch的元数据信息) -
Coordinates the transactions - 协调事务
Storm会管理决定transaction应该处理什么阶段(processing、committing) -
Fault detection - 故障检测
Storm内部通过Acker机制保障消息被正常处理(用户不需要手动去维护) -
First class batch processing API
Storm提供batch bolt接口
三种事务:
三种分区介绍
- 普通事务
- Partitioned Transaction - 分区事务
- Opaque Transaction - 不透明分区事务
第六章 Flume-Kafka-Storm整合案例实现
前提: 安装了Flume,Kafka,以及Storm
Flume介绍以及安装
Kafka介绍以及安装
一 架构设计
二 过程描述
该过程实现了数据的清洗
-
我们通过客户端(flume的api程序RpcClientDemo )向flume写入数据
-
Flume通过启动脚本整合kafka将输入写入到topic 中, 名为testflume
-
Storm集群通过kafkaSpout 程序接收 testflume 的数据, 通过 FilterBolt过滤指定格式的数据,然后通过 kafkaBolt 输出到Kafka集群中的 LogError主题中输出
-
我们可以通过kafka的消费者端来查看 LogError主题中输出的指定格式的数据
三 具体步骤
1.启动zk集群,kafka集群,flume
启动zk
zkServer.sh start
启动kafka
kafka-server-start.sh /opt/kafka/config/server.properties
启动flume( flume-kafka.conf为flume的启动脚本,见本人Kafka博文介绍第三章 )
flume-ng agent -n a1 -c conf -f /opt/flume/conf/flume-kafka.conf -Dflume.root.logger=DEBUG,console
2.启动kafka的消费者端进程
监听testflume 数据流转
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic testflume
监听LogError数据流转
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic LogError
3.运行代码测试
a.运行RpcClientDemo , 查看testflume监听的数据流转情况(图1)
/**
* Flume官网案例
* http://flume.apache.org/FlumeDeveloperGuide.html
* @author root
*/
public class RpcClientDemo {
public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("node2", 41414);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
for (int i = 100; i < 150; i++) {
String sampleData = "Hello Flume!ERROR" + i;
client.sendDataToFlume(sampleData);
System.out.println("发送数据:" + sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the
// above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of
// the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
b.运行LogFilterTopology ,过滤数据,并将数据发送给kafka集群中的 LogError主题,效果如图2
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class LogFilterTopology {
public static class FilterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String line = tuple.getString(0);
System.err.println("Accept: " + line);
// 包含ERROR的行留下
if (line.contains("ERROR")) {
System.err.println("Filter: " + line);
collector.emit(new Values(line));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
declarer.declare(new Fields("message"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// https://github.com/apache/storm/tree/master/external/storm-kafka
// config kafka spout,话题
String topic = "testflume";
ZkHosts zkHosts = new ZkHosts("node2:2181,node3:2181,node4:2181");
// /MyKafka,偏移量offset的根目录,记录队列取到了哪里
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 对应一个应用
List<String> zkServers = new ArrayList<String>();
System.out.println(zkHosts.brokerZkStr);
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 2181;
// 是否从头开始消费
spoutConfig.forceFromStart = true;
spoutConfig.socketTimeoutMs = 60 * 1000;
// StringScheme将字节流转解码成某种编码的字符串
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// set kafka spout
builder.setSpout("kafka_spout", kafkaSpout, 3);
// set bolt
builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");
// 数据写出
// set kafka bolt
// withTopicSelector使用缺省的选择器指定写入的topic: LogError
// withTupleToKafkaMapper tuple==>kafka的key和message
KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");
Config conf = new Config();
// set producer properties.
Properties props = new Properties();
props.put("metadata.broker.list", "node2:9092,node3:9092,node4:9092");
/**
* Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
* 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
* 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
*/
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", props);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node2", "node3", "node4" }));
// 本地方式运行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology());
}
}
c.修改 RpcClientDemo 中的循环语句,验证 FilterBolt是否起到了过滤的作用
查看testflume, 图3; 查看LogError, 图4
可以看到数据流转到了testflume主题, 而没有流转到LogError,由此可以看出 FilterBolt起到了过滤的作用
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
for (int i = 200; i < 250; i++) {
String sampleData = "Hello Flume!" + i;
client.sendDataToFlume(sampleData);
System.out.println("发送数据:" + sampleData);
}
图1
图2
图3
四 项目应用架构
-
采集层:实现日志收集,使用负载均衡策略
-
消息队列:作用是解耦及不同速度系统缓冲
-
实时处理单元:用Storm来进行数据处理,最终数据流入DB中
-
展示单元:数据可视化,使用WEB框架展示
如果自己想应聘大公司, 一定要去别人技术分享网站看一看,就像美团技术团队官网
链接:https://pan.baidu.com/s/1wu9qYQZPxkqOdiY5QGR2cg
点赞私聊获取资料~~~
提取码:m8kh
复制这段内容后打开百度网盘手机App,操作更方便哦