http://blog.csdn.net/ch717828/article/details/50748912
1. 机器&环境 准备
我准备了3台机器 ,分别是
10.101.214.71
10.101.214.73
10.101.214.74
且这三台机器均安装了 kafka和storm。详细参考上面两篇文章。
注意,之前的文章我安装的storm版本为0.9.1 ,该版本中缺少许多与kafka集成需要的包,因此,升级为0.9.2 。
2.Storm自定义日志
为了清晰得打印出Storm处理 Kafka发送来的消息,此处自定义了一个日志。
- // 在73,74机器上 修改 /usr/share/storm/logback/cluster.xml
- <appender name="mylog" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${storm.home}/logs/mylog.log</file><!-- log文件输出path -->
- <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
- <fileNamePattern>${storm.home}/logs/mylog.log.%i</fileNamePattern><!-- 保留多个文件的文件命名格式 -->
- <minIndex>1</minIndex>
- <maxIndex>20</maxIndex><!-- 这两行可以共同配置保留多少个文件 -->
- </rollingPolicy>
- <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
- <maxFileSize>100MB</maxFileSize><!-- log文件的最大大小 -->
- </triggeringPolicy>
- <encoder>
- <pattern>%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ} %c{1} [%p] %m%n</pattern> <!-- 输出的日志信息的格式 -->
- </encoder>
- </appender>
- <logger name="ch.main.MyKafkaTopology" additivity="false" >
- <!-- name 可以配置哪些包下的日志信息要输出,也可以精准到一个类 -->
- <level value="INFO"/><!-- 要输出的日志信息的级别,我要输出业务日志,则配置为INFO -->
- <appender-ref ref="mylog"/><!-- 上面的appender的name -->
- </logger>
配置好后, ch.main.MyKafkaTopology打印出的INFO日志,均会存在 /usr/share/storm/logs/mylog.log 文件下
3.代码编写
pom.xml
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>0.9.2-incubating</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
- <version>0.9.2-incubating</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>0.9.0.0</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
java 代码
- package ch.main;
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.StormSubmitter;
- import backtype.storm.generated.AlreadyAliveException;
- import backtype.storm.generated.InvalidTopologyException;
- import backtype.storm.spout.SchemeAsMultiScheme;
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import storm.kafka.*;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * Created by chenhong on 16/2/24.
- */
- public class MyKafkaTopology {
- public static class KafkaWordSplitter extends BaseRichBolt{
- // private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
- private static final Logger LOG = LoggerFactory.getLogger(KafkaWordSplitter.class);
- private static final long serialVersionUID = 1L;
- private OutputCollector collector;
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
- @Override
- public void execute(Tuple input) {
- String line = input.getString(0);
- LOG.info("RECE[kafka -> splitter] "+line);
- String[] words = line.split("\\s+");
- for(String word : words){
- LOG.info("EMIT[splitter -> counter] "+word);
- collector.emit(input,new Values(word,1));
- }
- collector.ack(input);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word","count"));
- }
- }
- public static class WordCounter extends BaseRichBolt {
- // private static final Log LOG = LogFactory.getLog(WordCounter.class);
- private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class);
- private static final long serialVersionUID =1L;
- private OutputCollector collector;
- private Map<String,AtomicInteger> counterMap;
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector=collector;
- this.counterMap = new HashMap<String,AtomicInteger>();
- }
- @Override
- public void execute(Tuple input) {
- String word = input.getString(0);
- int count = input.getInteger(1);
- LOG.info("RECE[splitter -> counter] "+word+" : "+count);
- AtomicInteger ai = this.counterMap.get(word);
- if(ai==null){
- ai= new AtomicInteger();
- this.counterMap.put(word,ai);
- }
- ai.addAndGet(count);
- collector.ack(input);
- LOG.info("CHECK statistics map: "+this.counterMap);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word","count"));
- }
- @Override
- public void cleanup() {
- LOG.info("The final result:");
- Iterator<Map.Entry<String,AtomicInteger>> iter = this.counterMap.entrySet().iterator();
- while(iter.hasNext()){
- Map.Entry<String,AtomicInteger> entry =iter.next();
- LOG.info(entry.getKey()+"\t:\t"+entry.getValue().get());
- }
- }
- }
- public static void main(String[] args) throws AlreadyAliveException,InvalidTopologyException,InterruptedException{
- String zks = "10.101.214.71:2181,10.101.214.73:2181,10.101.214.74:2181";
- String topic ="my-replicated-topic5";
- String zkRoot ="/kafka" ;
- String id ="word"; // 读取的status会被存在,/zkRoot/id下面,所以id类似consumer group
- BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");
- SpoutConfig spoutConf = new SpoutConfig(brokerHosts,topic,zkRoot,id);
- spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
- spoutConf.forceFromStart = false;
- spoutConf.zkServers= Arrays.asList(new String[]{"10.101.214.71","10.101.214.73","10.101.214.74"});
- spoutConf.zkPort=2181;
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); //// Kafka我们创建了一个5分区的Topic,这里并行度设置为5
- builder.setBolt("word-splitter",new KafkaWordSplitter(),2).shuffleGrouping("kafka-reader");
- builder.setBolt("word-counter",new WordCounter() ).fieldsGrouping("word-splitter",new Fields("word"));
- Config config = new Config();
- String name = MyKafkaTopology.class.getSimpleName();
- if(args !=null && args.length>0 ){
- //config.put(Config.NIMBUS_HOST,args[0]);
- config.setNumWorkers(3);
- StormSubmitter.submitTopology(name,config,builder.createTopology());
- }else{
- config.setMaxTaskParallelism(3);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(name,config,builder.createTopology());
- Thread.sleep(60000);
- cluster.shutdown();
- }
- }
- }
4 提交运行
使用 mvn将项目打包
- mvn clean install
为了在storm中使用kafka,需要将 依赖jar文件到Storm集群中的lib目录下面
- cp /usr/local/kafka/libs/kafka_2.11-0.9.0.0.jar /usr/share/storm/lib/
- cp /usr/local/kafka/libs/scala-library-2.11.7.jar /usr/share/storm/lib/
- cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/share/storm/lib/
- cp /usr/local/kafka/libs/snappy-java-1.1.1.7.jar /usr/share/storm/lib/
- cp /usr/local/kafka/libs/zkclient-0.7.jar /usr/share/storm/lib/
- cp /usr/local/kafka/libs/log4j-1.2.17.jar /usr/share/storm/lib/
- cp /usr/local/kafka/libs/slf4j-api-1.7.6.jar /usr/share/storm/lib/
- cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/share/storm/lib/
提交
- //在 71机器上提交
- storm jar StormKafka0.1-1.0-SNAPSHOT.jar ch.main.MyKafkaTopology MyKafkaTopology
- //在71机器上打开 kafka启动Producer ,产生日志
- /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh --broker-list 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092 --topic my-replicated-topic5
- (随便输入一些内容)
- //在 73,74机器上查看日志
- cat /usr/share/storm/logs/mylog.log
- (可以看到 MyKafkaTopology 打出的日志)
下面是我查看mylog.log的部分日志
- 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123
- 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123
- 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123
- 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] 123123
- 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] aa
- 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] bbc
- 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ccc
- 2016-02-26T12:47:13.238+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ddd
- 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] eeee
- 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] ffffff
- 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$KafkaWordSplitter [INFO] EMIT[splitter -> counter] jsdkfjasnng
- 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123 : 1
- 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=18, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}
- 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123123 : 1
- 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=19, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}
- 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] RECE[splitter -> counter] 123123 : 1
- 2016-02-26T12:47:13.239+0800 c.m.MyKafkaTopology$WordCounter [INFO] CHECK statistics map: {=2, aa=6, bbc=6, ccc=6, --broker-list=1, ddd=6, eeee=6, my-replicated-topic5=1, asdfasdfasdf=15, 123=7, jsdkfjasnng=6, 123123=20, 10.101.214.71:9092,10.101.214.73:9092,10.101.214.74:9092=1, --topic=1, v=1, /usr/local/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh=1, ffffff=6}
其他
1 启动storm 发生 line 61:normclasspath = cygpath if sys.platform == 'cygwin' else identity 错误
- 安装python2.7
- 修改/usr/bin/storm
- 将首行显示的 !#/usr/bin/python 修改为 !#/home/tops/bin/python2.7
在集成过程中可能会遇到许多奇怪的问题,一路走来也踩了许多坑,有问题的可以私信或者留言。