版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Anbang713/article/details/85220826
本篇我们没有长篇大论的理论,我们通过一个WordCount程序对Storm中的几个重要概念进行大概的了解。而所谓的WordCount程序即对输入的一些句子进行单词次数统计,作为最最入门的程序,入门到就和我们熟悉的HelloWorld一样。接下来,我们就看看这个程序代码。
一、项目工程结构
1.1、pom文件依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.study.storm</groupId>
<artifactId>wordcount</artifactId>
<version>0.0.1-SNAPSHOT</version>
<description>Storm入门之WordCount程序</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
二、RandomSentenceSpout类
/**
* Spout:主要是负责从数据源获取数据,并将数据发送给Bolt
*
* @author Administrator
*
*/
public class RandomSentenceSpout extends BaseRichSpout {
private static final long serialVersionUID = 1858043156247980952L;
private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);
private SpoutOutputCollector collector;
private Random random;
/** 对Spout进行初始化,比如说创建一个线程池、数据库连接池等等 */
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}
/**
* spout类最终会运行在某个worker进程的某个executor线程内的某个task中,
* 那个task会负责去不断的无限循环调用nextTuple()方法。这样的话,就可以不断发送最新的数据出去,形成一个数据流。
*/
public void nextTuple() {
Utils.sleep(2000);
String[] sentences = new String[] {
"the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs",
"i am at two with nature" };
String sentence = sentences[random.nextInt(sentences.length)];
LOGGER.info("【发射句子】sentence=" + sentence);
// 这个values就可以认为是构建一个tuple。tuple是最小的数据单位,无限个tuple组成的流就是一个stream。
collector.emit(new Values(sentence));
}
/**
* 定义一个发射出去的每个tuple中的每个field名称是什么。
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(Variables.FIELD_NAME_SPOUT));
}
}
三、SplitSentenceBolt类
/**
* 每个bolt同样是发送到worker某个executor的task中执行
*
* @author Administrator
*
*/
public class SplitSentenceBolt extends BaseRichBolt {
private static final long serialVersionUID = -1863792429350238883L;
private OutputCollector collector;
/**
* 对于bolt来说,第一个方法就是prepare()方法。
*/
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 每接收到一条数据后,就会交给executor方法来执行
*/
public void execute(Tuple input) {
String sentence = input.getStringByField(Variables.FIELD_NAME_SPOUT);
if (sentence != null && "".equals(sentence) == false) {
String[] words = sentence.split(" ");
for (String word : words) {
collector.emit(new Values(word));
}
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(Variables.FIELD_NAME_BOLT_WORD));
}
}
四、WordCountBolt类
/**
* 单词计数bolt
*
* @author Administrator
*
*/
public class WordCountBolt extends BaseRichBolt {
private static final long serialVersionUID = -8940950046975910504L;
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);
private OutputCollector collector;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String word = input.getStringByField(Variables.FIELD_NAME_BOLT_WORD);
Integer count = wordCounts.get(word);
if (count == null) {
count = 0;
}
wordCounts.put(word, ++count);
LOGGER.info("【单词计数】" + word + "出现的次数是" + count);
collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer
.declare(new Fields(Variables.FIELD_NAME_BOLT_WORD, Variables.FIELD_NAME_BOLT_WORD_COUNT));
}
}
五、WordCountTopology类
/**
* Topology:一个拓扑涵盖数据源获取/生产+数据处理的所有的代码逻辑
*
* @author Administrator
*
*/
public class WordCountTopology {
public static void main(String[] args) {
// 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑
TopologyBuilder builder = new TopologyBuilder();
// 第一个参数的意思,就是给这个spout设置一个名字
// 第二个参数的意思,就是创建一个spout的对象
// 第三个参数的意思,就是设置spout的executor有几个
builder.setSpout(Variables.SPOUT_ID_RANDOM_SENTENCE, new RandomSentenceSpout(), 2);
builder.setBolt(Variables.BOLT_ID_SPLIT_SENTENCE, new SplitSentenceBolt(), 5).setNumTasks(10)
.shuffleGrouping(Variables.SPOUT_ID_RANDOM_SENTENCE);
// 这里设置fieldsGrouping很重要,相同的单词从SplitSentence发射出来时,一定会进入到下游的指定的同一个task中
// 只有这样子,才能准确的统计出每个单词的数量
builder.setBolt(Variables.BOLT_ID_WORD_COUNT, new WordCountBolt(), 10).fieldsGrouping(
Variables.BOLT_ID_SPLIT_SENTENCE, new Fields(Variables.FIELD_NAME_BOLT_WORD));
Config config = new Config();
config.setMaxTaskParallelism(20);
// 在eclipse本地运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(Variables.TOPOLOGY_WORD_COUNT, config, builder.createTopology());
Utils.sleep(10000);
cluster.shutdown();
}
}
六、测试结果
可以看到,我们已经能够实现对所有单词出现的次数进行统计了。
七、总结
通过上面的WordCount程序,我们知道Storm中的核心概念有:Topology,Spout,Bolt,Tuple,Stream。
(1)Topology:数据源获取/生产+数据处理的所有的代码逻辑的集合。
(2)Spout:数据源的一个代码组件。在这个spout代码中,我们可以自己尝试去数据源获取数据,比如说从kafka中消费数据。
(3)Bolt:一个业务处理的代码组件。Spout会将数据传送给Bolt进行逻辑处理,各种Bolt还可以串联成一个计算链条。
(4)Tuple:就是一条数据,每条数据都会被封装在tuple中,在多个spout和bolt之间传递。
(5)Stream:就是一个流,一个抽象的概念,源源不断过来的Tuple就组成了一条数据流。