public class SumBolt implements IBasicBolt { /** * 对发射所有字符串统计总个数及去重个数 */ private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple input, BasicOutputCollector collector) { try { // 变量放在方法外面会进行累加,所有数据放在counts这个Map当中。 long word_sum = 0;// 总数 long word_count = 0;// 去重后个数 String word = input.getString(0); Integer count = input.getInteger(1); counts.put(word, count); // 获取总数,遍历counts的values,进行sum Iterator<Integer> i = counts.values().iterator(); while (i.hasNext()) { word_sum += i.next(); } Iterator<String> i2 = counts.keySet().iterator(); while (i2.hasNext()) { String oneWord = i2.next(); if (oneWord != null) { word_count ++; } } System.err.println("a="+counts.get("a")+" b="+counts.get("b")+" c="+counts.get("c")+" d="+counts.get("d")); System.err.println( Thread.currentThread().getName() + "word_sum=" + word_sum + ",-------word_count=" + word_count); } catch (Exception e) { throw new FailedException("split error!"); } } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } }
2、topology增加字符统计
public class WordCountTopology { /** * 提交topology的main函数及字符统计处理类 */ public static class SplitSentence extends ShellBolt implements IRichBolt { private static final long serialVersionUID = 1L; /** * 字符统计处理bolt类 */ public static class WordCount extends BaseBasicBolt { private static final long serialVersionUID = 1L; // 多线程下不能统计Map的key个数,和value表示的字符总数。因为多线下表示只是一部分 Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); //System.err.println(Thread.currentThread().getName() + "---word:" + word + " count:" + count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } //提交topology的main函数 public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //读取数据用1个线程,防止数据重复读取 builder.setSpout("spout", new RandomSentenceSpout(), 1); //从spout源读取数据,设置2个线程处理字符分割 builder.setBolt("split", new MysplitBolt(" "), 2).shuffleGrouping("spout"); /** * 上个bolt接收数据,设置3个线程处理数据统计。 * Fields Grouping:按Field分组,相同的tuple会分发给同一个线程(Executer或task)处理。所以不担心多线程问题 * 比如按singleWord来分组, 具有同样singleWord的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 */ //builder.setBolt("count", new WordCount(), 3).fieldsGrouping("split", new Fields("singleWord")); //平均分配tuple数据至每一个线程处理,统计会有线程安全问题 builder.setBolt("count", new WordCount(), 3).shuffleGrouping("split"); builder.setBolt("sum", new SumBolt(),1).shuffleGrouping("count"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } } }
-----------------------其它类--------------------------------------
3.字符发射spout类
/** * 字符发射spout类 */ public class RandomSentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; SpoutOutputCollector _collector; Random _rand; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { //a:2 , b:2 , c:1 , d:3 String[] sentences = new String[] { sentence("a b c d "), sentence("b d"), sentence("a d") }; for (String sentence : sentences) {// 发射三行数据致bolt处理 _collector.emit(new Values(sentence)); } Utils.sleep(1000 * 1000); } protected String sentence(String input) { return input; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("firstSpout")); } // Add unique identifier to each tuple, which is helpful for debugging public static class TimeStamped extends RandomSentenceSpout { private final String prefix; public TimeStamped() { this(""); } public TimeStamped(String prefix) { this.prefix = prefix; } protected String sentence(String input) { return prefix + currentDate() + " " + input; } private String currentDate() { return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); } } }
4.字符切割处理bolt类
/** * 字符切割处理bolt类 */ public class MysplitBolt implements IBasicBolt { private static final long serialVersionUID = 1L; String patton; public MysplitBolt(String patton) { this.patton = patton; } /** * 接收处理每一行数据 */ public void execute(Tuple input, BasicOutputCollector collector) { try { String sen = input.getStringByField("firstSpout"); if (sen != null) { for (String word : sen.split(patton)) {// 发射多个字符数据,让下一级bolt处理 collector.emit(new Values(word)); } } } catch (Exception e) { throw new FailedException("split error!"); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("singleWord")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } public void cleanup() { // TODO Auto-generated method stub } }
5.pom文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>