kafka 作为源头 Spout,Storm进行流计算处理(以WordCount为例)
导入依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
开启一个kafka的生产者进程
//启动ZooKeeper
$> zkServer.sh start
//启动kafka服务
$> kafka-server-start.sh /soft/kafka/config/server.properties
//创建一个主题 test
$> kafka-topics.sh --create --zookeeper s202:2181 --replication-factor 3 --partitions 3 --topic test
//启动kafka生产者进程
$> kafka-console-producer.sh --broker-list s202:9092 --topic test
分片 Bolt
public class SplitBolt implements IRichBolt {
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.context = context;
this.collector = collector;
}
public void execute(Tuple tuple) {
String line = tuple.getString(0);
System.out.println(line);
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
WCApp
public class WCApp {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//配置zk连接
String zkConnString = "s202:2181";
BrokerHosts hosts = new ZkHosts(zkConnString);
//Spout配置
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test", UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("kafkaspout", kafkaSpout).setNumTasks(2);
builder.setBolt("spilt-bolt", new SplitBolt(), 2).shuffleGrouping("kafkaspout").setNumTasks(2);
Config config = new Config();
config.setNumWorkers(2);
config.setDebug(true);
/**
* 本地模式storm
*/
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wc", config, builder.createTopology());
System.out.println("hello world llll");
}
}