文件配置:
storm.zookeeper.servers:
- "data1"
- "data2"
storm.zookeeper.port: 2181
storm.local.dir: "/mnt/storm"
nimbus.seeds: ["data2","data3"]
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
操作指令:
后台启动脚本命令:
1. Nohup
例如: nohup ./storm nimbus &
解释:后台启动一个脚本命令,并将脚本产生的日志输出到脚本相同目录的nohup.out文件中。
2. >/dev/null 2>&1 &
例如: ./storm nimbus >/dev/null 2>&1 &
解释:后台启动一个脚本命令,并且丢弃掉脚本命令所产生的任何输出日志信息。
Storm常用命令可以访问官网:
http://storm.apache.org/releases/1.0.2/Command-line-client.html
1. nimbus
例如:storm nimbus
解释:启动nimbus(主)守护进程
2. supervisor
例如:storm supervisor
解释:启动supervisor(工作节点)守护进程
3. ui
例如:storm ui
解释:启动ui守护线程。地址http://data2:8080/index.html
4. jar
例如:storm jar /opt/stromdemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.cetc.stromdemo.wordcount.WordCountTopology '/word_count.txt'
解释:提交一个拓扑到storm集群中
5. kill
例如:storm kill WordCountTopology
解释:杀死一个运行中的拓扑
6. deactivate
例如:storm deactivate WordCountTopology
解释:禁用一个运行中的拓扑
7. activate
例如:storm activate WordCountTopology
解释:激活一个禁用的拓扑
8. rebalance
例如:storm rebalance WordCountTopology -w 5 -n 4 -e LineSplitBolt=1
解释:对一个已经运行的任务,重新更改worker数和bolt或spout并发度。但是更改并发度时,只能降低,提高并发度无效。storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*
9. repl
例如:storm repl
解释:打开clojure命令行操作,你可以在其中使用lisp
10. classpath
例如:storm classpath
解释:查看storm在本机所包含的类路径
11. localconfvalue
例如:storm localconfvalue storm.zookeeper.servers
解释:根据key输出storm在本地配置文件中的一个配置的值
12. remoteconfvalue
例如:storm remoteconfvalue storm.zookeeper.servers
解释:根据key输出storm在集群中的配置值
13. list
例如:storm list
解释: 列出运行拓扑及其状态。
14. version
例如:storm version
解释:查看storm版本
代码示例:
import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Map; import java.util.Random; /** * Created by shea on 2018/2/2. */ public class RandomNameSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; //输出收集器 private SpoutOutputCollector collector; // 模拟一些数据 String[] names = { "zhangsan", "lisi", "wangwu", "zhaoliu", "sunqi", "wangba" }; /** * 不断地往下一个组件发送tuple消息 这里面是该spout组件的核心逻辑 */ public void nextTuple() { Random random = new Random(); int index = random.nextInt(names.length); // 通过随机数拿到一个姓名 String lowerName = names[index]; // 将姓名封装成tuple,发送消息给下一个组件 collector.emit(new Values(lowerName)); // 每发送一个消息,休眠500ms Utils.sleep(500); } /** * 初始化方法,在spout组件实例化时调用一次c */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 声明本spout组件发送出去的tuple中的数据的字段名 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("lowerName")); } }
import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * Created by shea on 2018/2/2. */ public class UpperBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; //获取数据 public void execute(Tuple tuple) { // 先获取到上一个组件传递过来的数据,数据在tuple里面 String lowerName = tuple.getString(0); // 将姓名转换成大写 String upperName = lowerName.toUpperCase(); // 将转换完成的商品名发送出去 collector.emit(new Values(upperName)); } public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector) { this.collector = collector; } /** * 声明该bolt组件要发出去的tuple的字段 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("upperName")); } }
import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.io.FileWriter; import java.io.IOException; import java.util.Map; import java.util.UUID; /** * Created by shea on 2018/2/2. */ public class AppendBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; FileWriter fileWriter = null; public void execute(Tuple tuple) { // 先拿到上一个组件发送过来的姓名 String upperName = tuple.getString(0); String suffix_name = upperName + "_csdn"; try { fileWriter.write(suffix_name); fileWriter.write("\n"); fileWriter.flush(); } catch (IOException e) { throw new RuntimeException(e); } } /** * 在bolt组件运行过程中只会被调用一次 */ public void prepare(Map conf, TopologyContext context, OutputCollector collector) { try { //fileWriter = new FileWriter("C:\\Users\\91BGJK2\\Desktop\\storm\\" + UUID.randomUUID()); fileWriter = new FileWriter("/mnt/" + UUID.randomUUID()); } catch (IOException e) { throw new RuntimeException(e); } } /** * 本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段 */ public void declareOutputFields(OutputFieldsDeclarer arg0) { }
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; /** * storm的测试程序 * Created by shea on 2018/2/2. */ public class StormMain { public static void main(String[] args) throws Exception { //拓扑构造器 TopologyBuilder builder = new TopologyBuilder(); // 将我们的spout组件设置到topology中去 // parallelism_hint :4 表示用4个excutor来执行这个组件-----应该是表示4个线程 // setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task builder.setSpout("randomspout", new RandomNameSpout(), 4).setNumTasks(8); // 将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息 // .shuffleGrouping("randomspout")包含两层含义: // 1、upperbolt组件接收的tuple消息一定来自于randomspout组件 // 2、randomspout组件和upperbolt组件的大量并发 // task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout"); // 将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息 builder.setBolt("suffixbolt", new AppendBolt(), 4).shuffleGrouping("upperbolt"); // 用builder来创建一个topology StormTopology stormTopologyDemo = builder.createTopology(); // 配置一些topology在集群中运行时的参数 Config conf = new Config(); // 这里设置的是整个StormTopologyDemo所占用的槽位数,也就是worker的数量 conf.setNumWorkers(4); conf.setDebug(true); conf.setNumAckers(0); LocalCluster cluster = new LocalCluster(); //cluster.submitTopology("StormTopologyDemo",conf,stormTopologyDemo); // 将这个topology提交给 storm集群运行 StormSubmitter.submitTopology("StormTopologyDemo", conf, stormTopologyDemo); } }
单机启动:
直接可以在程序中运行(调用cluster.submitTopology("StormTopologyDemo",conf,stormTopologyDemo);),如果使用集群运行,需要打包到集群上,使用指令运行即可
./storm jar /mnt/storm-1.0-SNAPSHOT.jar StormMain