我的需求是从kafka里取数据数据,然后对数据进行加工,最后保存到HBase里。
1.拓扑
这里我的spout用的是storm-kafka-0.93.jar里的KafkaSpout类来作为输入源,我自己写的后边贴出来,不过我自己写的有问题,会把Topology跑死
package test.kafka; import java.util.Arrays; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import test.SimpleBolt; import test.SimpleBolt2; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; public class KafkaTopology { public static void main(String[] args) { try { //实例化topologyBuilder类。 TopologyBuilder topologyBuilder = new TopologyBuilder(); //设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 // String zks = "h1:2181,h2:2181,h3:2181"; // String topic = "my-replicated-topic5"; // String zkRoot = "/storm"; // default zookeeper root configuration for storm // String id = "word"; // // BrokerHosts brokerHosts = new ZkHosts(zks); // SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); // spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // spoutConf.forceFromStart = true; // spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"}); // spoutConf.zkPort = 2181; BrokerHosts brokerHosts = new ZkHosts("10.10.92.161:2181"); String topic = "TEST-TOPIC3"; // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, "/storm", "jd-group5"); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConfig.zkServers = Arrays.asList(new String[] {"10.10.92.161"}); spoutConfig.zkPort = 2181; // -2 从kafka头开始 -1 是从最新的开始 0 =无 从ZK开始 // spoutConfig.startOffsetTime = Integer.valueOf(StormAppConfigUtil.get(CommonConstant.KAFKA_CONF_OFFSETTIME)); // spoutConfig.forceFromStart = Boolean.valueOf(StormAppConfigUtil.get(CommonConstant.KAFKA_CONF_FORCESTART)); spoutConfig.forceFromStart = false;//这个是是否每次从头读,false为不从头读 KafkaSpout receiver = new KafkaSpout(spoutConfig); topologyBuilder.setSpout("kafka-spout", receiver,5).setNumTasks(10); // 设置数据处理节点,并分配并发数。指定该几点接收喷发节点的策略为随机方式。 topologyBuilder.setBolt("kafka-bolt", new SimpleBolt(),5).setNumTasks(10).shuffleGrouping("kafka-spout"); topologyBuilder.setBolt("kafka-hbase-bolt", new SimpleBolt2(),5).setNumTasks(10).shuffleGrouping("kafka-bolt"); Config config = new Config(); config.setDebug(false); if (args != null && args.length > 0) { /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程 如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了 一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交 但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。 */ config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else { //这里是本地模式下运行的启动代码。 config.setNumWorkers(2); config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("simple", config, topologyBuilder.createTopology()); // Thread.sleep(5000); // cluster.killTopology("simple"); // cluster.shutdown(); } } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } }
2.正常从kafka里取出数据进行处理的Bolt
package test; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * Created by IntelliJ IDEA. User: comaple.zhang Date: 12-8-28 Time: 下午2:11 To * change this template use File | Settings | File Templates. */ @SuppressWarnings("serial") public class SimpleBolt extends BaseRichBolt { private OutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("info","id")); } @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { try { String mesg = input.getString(0); if (mesg != null) { collector.emit(new Values( mesg+"mesg is processed!",mesg)); // System.out.println("Bolt"+this.hashCode()+":"+mesg); } } catch (Exception e) { e.printStackTrace(); // To change body of catch statement use File | collector.fail(input); // Settings | File Templates. } collector.ack(input); } }
3.插入HBase里Bolt
package test; import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import config.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; /** * Created by IntelliJ IDEA. User: comaple.zhang Date: 12-8-28 Time: 下午2:11 To * change this template use File | Settings | File Templates. */ @SuppressWarnings("serial") public class SimpleBolt2 extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { try { String id = input.getStringByField("id"); String mesg = input.getStringByField("info"); if (mesg != null) { Table table = Config.con.getTable(TableName.valueOf("xyz")); Put put = new Put(id.getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值 put.addColumn("cf1".getBytes(), "val".getBytes(), mesg.getBytes());// 本行数据的第一列 table.put(put); } } catch (Exception e) { e.printStackTrace(); // To change body of catch statement use File | collector.fail(input); // Settings | File Templates. } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
4.HBase的一个配置类
public class Config { public static Configuration configuration; public static Connection con; static { configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.property.clientPort", "2181"); configuration.set("hbase.zookeeper.quorum", "10.10.92.151"); try { con = ConnectionFactory.createConnection(configuration); } catch (IOException e) { e.printStackTrace(); } } }
最后把我自己写的Spout贴出来,我这个的问题是经常把Topology给跑死,我的做法是在每次转发后等待1毫秒,问题就不出现了,也不知道是为什么。还是对STORM跟KAFKA都不太熟。
package test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import testkafka.t3.KafkaProducer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class KafkaSpout extends BaseRichSpout { private SpoutOutputCollector collector; private static ConsumerConnector consumer; /** * 这里初始化collector * * @param conf * @param context * @param collector */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; Properties props = new Properties(); // zookeeper 配置 props.put("zookeeper.connect", "10.10.92.161:2181"); // group 代表一个消费组 props.put("group.id", "jd-group5"); // zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } /** * 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组) 该方法会被不停的调用 */ @Override public void nextTuple() { try { // String msg = info[rd.nextInt(10)]; // // 调用发射方法 // collector.emit(new Values(msg),rd.nextInt(100)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("TEST-TOPIC2", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get("TEST-TOPIC2").get(0); ConsumerIterator<String, String> it = stream.iterator(); System.out.println(123456); while (it.hasNext()){ String message = it.next().message(); collector.emit(new Values(message)); } // 模拟等待100ms } catch (Exception e) { e.printStackTrace(); } } /** * 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 该declarer变量有很大作用,我们还可以调用 * declarer.declareStream(); 来定义stramId,该id可以用来定义 更加复杂的流拓扑结构 * * @param declarer */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("source")); } @Override public void ack(Object msgId) { System.out.println("任务执行完了:" + msgId); } @Override public void fail(Object msgId) { System.out.println("任务执行失败了:" + msgId); } }
程序结构写的不错特别好,只是个例子。
其实都是从网上找的例子,做了下整合与修改。
记录一下。