本文以Nimbus启动流程为线索,解读对应的Storm源码。[此处代码版本为storm-0.9.3]
1. 从Python脚本开始
Nimbus 的启动从一句 “storm nimbus” 指令开始,这句指令通过 STORM_DIR/bin/storm 这个Python 脚本来运行 Java 类 backtype.storm.daemon.nimbus,从控制台的输出可以看到具体地是执行如下格式的一条指令:
java -server -Dstorm.options= -Dstorm.home=$STORM_DIR -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp $STORM_CLASSPATH -Xmx1024m -Dlogfile.name=nimbus.log -Dlogback.configurationFile=$STORM_DIR/logback/cluster.xml backtype.storm.daemon.nimbus
-D后面接的都是定义的变量,可在程序中用 System.getProperty("变量名") 得到其值。-Xmx1024m指定程序运行的最大堆栈为1GB,最后加载的类为backtype.storm.daemon.nimbus,从该类的main函数开始运行。
2. 进入Java 的 nimbus 类
这个类是在 nimbus.clj 中定义的,部分代码如下。文件头的 ns 一句说明该文件将生成一个 Java 类,并且该类除了 main 函数外还有另一个静态函数叫 launch,也就是代码中的 -launch 函数。我们从其 main 函数看起,在代码的底部:
(ns backtype.storm.daemon.nimbus ;;...... (:gen-class :methods [^{:static true} [launch [backtype.storm.scheduler.INimbus] void]])) (defn launch-server! [conf nimbus] (validate-distributed-mode! conf) (let [service-handler (service-handler conf nimbus) options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) (THsHaServer$Args.) (.workerThreads 64) (.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE))) (.processor (Nimbus$Processor. service-handler)) ) server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))] (add-shutdown-hook-with-force-kill-in-1-sec (fn [] (.shutdown service-handler) (.stop server))) (log-message "Starting Nimbus server...") (.serve server))) (defn -launch [nimbus] (launch-server! (read-storm-config) nimbus)) (defn standalone-nimbus [] (reify INimbus (prepare [this conf local-dir] ) (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments] (->> supervisors (mapcat (fn [^SupervisorDetails s] (for [p (.getMeta s)] (WorkerSlot. (.getId s) p)))) set )) (assignSlots [this topology slots] ) (getForcedScheduler [this] nil ) (getHostName [this supervisors node-id] (if-let [^SupervisorDetails supervisor (get supervisors node-id)] (.getHost supervisor))) )) (defn -main [] (-launch (standalone-nimbus)))
第45行,
main 函数只是调用 -launch 函数来处理 standalone-nimbus 的返回值。
第25-42行,standalone-nimbus 函数返回一个实现了 INimbus 接口的类的一个对象。reify 是 clojure.core 中的函数,其相当于定义一个临时的类来实现某接口,最后返回一个该类的对象。INimbus 接口定义了 Nimbus 操作所需的几个函数,这里先不用细看。
第23行,-launch 函数则是调用 launch-server! 来启动 nimbus,其中多了一个参数,就是 read-storm-config 函数的返回值。这个函数在 config.clj (storm-core/src/clj/backtype/storm/config.clj) 中定义,可以顺便看下Storm的参数是怎么进来的:
2.1 read-storm-config
(defn read-storm-config [] (let [conf (clojurify-structure (Utils/readStormConfig))] (validate-configs-with-schemas conf) conf))
第3行,clojurify-structure这个函数在util.clj (storm-core/src/clj/backtype/storm/util.clj) 中定义,其实就是把Java对象中的所有Map 、List 对象(即java.util.Map 和 java.util.List)转换成Clojure的 Map 和 Vector(即clojure.lang.PersistentArrayMap 和 clojure.lang.PersistentVector)。
Utils/readStormConfig 指的是类 backtype.storm.utils.Utils 的 readStormConfig 函数,代码在 storm-core/src/jvm/backtype/storm/utils/Utils.java中:
public static Map readStormConfig() { Map ret = readDefaultConfig(); String confFile = System.getProperty("storm.conf.file"); Map storm; if (confFile==null || confFile.equals("")) { storm = findAndReadConfigFile("storm.yaml", false); } else { storm = findAndReadConfigFile(confFile, true); } ret.putAll(storm); ret.putAll(readCommandLineOpts()); return ret; }首先读取变量storm.conf.file的值,在上面的那句java指令中可以看到这个值为空,故程序从 storm.yaml 中读取配置信息,之后再加入命令行中传入的Storm参数。
2.2 launch-server!
回到 nimbus.clj 中的代码,接下来是运行到 launch-server! 函数。
第7行,确认nimbus的启动配置中为分布式模式。validate-distributed-mode! 函数在common.clj 中定义。
第8行,调用 service-handler 函数,并将返回值绑定到名为 service-handler 的本地变量。这一行其实做了很多事情,后面再展开来讲。大概地说,service-handler 函数返回一个处理各种 nimbus 请求的对象。这些请求包括 topology 提交、topology 状态的切换、获取topology或集群信息等等。
第9-14行,绑定一个叫 options 的变量,这个是用来设置 Thrift Server 的参数,看到第13行把 service-handler 设进去了就好。这里顺便提一下 clojure 中 "->" 和 "->>" 这两个宏,这两个宏为函数式编程中的函数复合提供了方便的写法。(-> (f a) (g b c) (h d e)) 的意思就是 (h (g (f a) b c) d e),先执行(f a),将其结果作为下一个函数的第一个参数插入,以此反复。而(->> (f a) (g b c) (h d e)) 的意思就是 (h d e (g b c (f a))),即先执行(f a),将其结果作为下一个函数的最后一个参数插入,以此反复。更具体的例子可以看"->" 和"->>"。
第15行,new 一个 THsHaServer。这是 Thrift 的一种 Server。Thrift 是一个跨语言的 RPC 框架,使得不同语言实现的 Server 和 Client 可以交互工作。因此 Storm 可以用 Python 等其它语言来构造并提交 topology。
"do"这一句对 options 再作一设置,然后将 options 返回,用它作为参数来 new 一个THsHaServer。关于 Thrift 不同类型的 Server 的比较,可以看这里。
第16-18行,为本进程绑定一个shutdown-hook,即当进程被 kill 或异常退出时要执行的代码,相当于 try-catch 里的 finally。我们知道Storm是不会自己关闭的,当我们手动 kill 来关闭 nimbus 时,Storm 就要做一些清理工作。这里绑定的是一个匿名函数,分别调用 service-handler.shutdown 和 server.stop。
第20行,启动 Thrift Server,到这里 Nimbus 的启动流程就结束了,Nimbus 以一个 Thrift Server 的形式在运行。
这样上面的代码片就看完了,接下来可以进入 service-handler 函数看一下。
2.3 service-handler
service-handler的代码如下(这里由于篇幅原因对其做了部分省略):
(defserverfn service-handler [conf inimbus] (.prepare inimbus conf (master-inimbus-dir conf)) (log-message "Starting Nimbus with conf " conf) (let [nimbus (nimbus-data conf inimbus)] (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf) (cleanup-corrupt-topologies! nimbus) (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))] (transition! nimbus storm-id :startup)) (schedule-recurring (:timer nimbus) 0 (conf NIMBUS-MONITOR-FREQ-SECS) (fn [] (when (conf NIMBUS-REASSIGN) (locking (:submit-lock nimbus) (mk-assignments nimbus))) (do-cleanup nimbus) )) ;; Schedule Nimbus inbox cleaner (schedule-recurring (:timer nimbus) 0 (conf NIMBUS-CLEANUP-INBOX-FREQ-SECS) (fn [] (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)) )) (reify Nimbus$Iface (^void submitTopologyWithOpts ...) (^void submitTopology ...) (^void killTopology ...) (^void killTopologyWithOpts ...) (^void rebalance ...) (activate ...) (deactivate ...) (beginFileUpload ...) (^void uploadChunk ...) (^void finishFileUpload ...) (^String beginFileDownload ...) (^ByteBuffer downloadChunk ...) (^String getNimbusConf ...) (^String getTopologyConf ...) (^StormTopology getTopology ...) (^StormTopology getUserTopology ...) (^ClusterSummary getClusterInfo ...) (^TopologyInfo getTopologyInfo ...) Shutdownable (shutdown [this] ...) DaemonCommon (waiting? [this] (timer-waiting? (:timer nimbus))))))
第3行,打出我们经常见到的 "Starting Nimbus with conf" 那句 log.
第4行开始,将 nimbus 变量绑定到 nimbus-data 函数的返回值上。nimbus-data 这个函数返回一个 map,里面绑定了诸多对象,比如提交 topology 的锁(submit-lock),比如做 topology 调度分配的 scheduler,再比如跟 zookeeper 交互的 client 等。nimbus-data 函数的返回值相当于一个结构体,里面包含了 nimbus 所需的属性。
第5行,调用一个 ITopologyValidator 对象的 prepare 函数。Storm 源码中只有一个 DefaultTopologyValidator 类实现了该接口,其prepare 函数没有做任何事情。这应该是 Storm 留给用户自定义的一个地方。
第6行,将zookeeper上有但nimbus本地上没有的topology信息从zookeeper上删掉。什么时候会出现这种情况呢?我觉得可能是Nimbus在kill一个topology时,本地的目录删掉了,但发给zookeeper的删除请求没有被正确处理,Nimbus就挂掉了。这样Nimbus再重启时,就会看到有topology在zookeeper上有记录,但在本地没有。
第7行,dosep 是 clojure.core 中的函数,跟 clojure.core 中的 for 很类似。只是 for 返回的是一个随性队列,于是代码不一定都会执行,而 doseq 则是专门用来执行有副作用的代码的 for。这里第7到8行,对所有运行着的 topology,即 zookeeper 上还有记录的 topology,做一个状态为 startup 的转变。这个具体是做什么可以继续追溯代码。大意是,对处于 active 和 inactive 状态的 topology,什么也不用做;对处于 killed 状态的 topology,Nimbus 重新开始 kill;对处于 rebalancing 的 topology,Nimbus 重新开始 rebalance。这两行其实就是考虑到 Nimbus 重启的情况,要让 Nimbus 继续上回挂掉前的工作。
第9~17行,设置一个定期执行的任务(这里是一个匿名函数),给定了初次调用的延迟(这里为0,即立即调用),还有之后每次调用的间隔(这里为nimbus.monitor.freq.secs)。Nimbus 有一个 timer 线程,专门负责执行这类的定期任务或一定延迟后要执行的任务(如若干秒后再 kill 一个 topology)。这里传入的匿名函数主要负责对各个 topology 的监控,其会让 Nimbus 对所有 topology 做重新分配(如果配置信息中有设定的话),然后再清理掉 Nimbus 上无效的信息。定期监控的目的是为了应对各个部件可能挂掉的情况,比如有 supervisor 挂掉了,则 Nimbus 对 topology 重新分配后,会把该 supervisor 上的任务放到其它 supervisor 上。
第19~24行,同样设置一个定期执行的任务,清理 Nimbus 上的 inbox 文件夹,里面放的是上传到 Nimbus 的各个 topology 的 jar 包。Nimbus 会定期清理没用的 jar 包。
第25~50行,这里显示的代码做了很多省略,这是一段很长的 reify,实现了 Nimbus$Iface、Shutdownable 和 DaemonCommon 三个接口。service-handler 函数最后返回的也就是这个类的一个对象。Nimbus$Iface 的函数都是 Nimbus 应该提供的各种服务;Shutdownable 接口就一个 shutdown 函数,定义了 Nimbus 该怎样退出;DaemonCommon 接口也只有一个 waiting? 函数,主要为测试所用。各个接口实现的代码我们在后续文章再具体讨论。
细心的读者可能会发现,service-handler 函数的定义用的是 defserverfn 而不是 defn。这其实是 Storm 自定义的一个宏,最后再来说一下这个宏,它是在 common.clj 中定义的,代码如下面的 1~11 行:
(defmacro defserverfn [name & body] `(let [exec-fn# (fn ~@body)] (defn ~name [& args#] (try-cause (apply exec-fn# args#) (catch InterruptedException e# (throw e#)) (catch Throwable t# (log-error t# "Error on initialization of server " ~(str name)) (exit-process! 13 "Error on initialization") ))))) (defserverfn service-handler [conf inimbus] ...) (macroexpand `(defserverfn service-handler [conf inimbus] ...)) (let [exec-fn (fn [conf inimbus] ...)] (defn service-handler [& args] (try-cause (apply exec-fn args) (catch java.lang.InterruptedException e (throw e)) (catch java.lang.Throwable t (log-error t "Error on initialization of server " "service-handler") (exit-process! 13 "Error on initialization")))))
这里还要注意一点,就是第22行的 try-cause 也是 Storm 自定义的宏,这个宏使用的范围很广,其跟 try 是不一样的!try-cause 这个宏在 util.clj 中定义,其比 try 要强大多了。try 之后的 catch 捕捉代码块里抛出的异常,只对这个异常的类型做匹配,而 try-cause 之后的 catch 则会匹配这个异常及其所有源头(cause)。举个例子,代码块里调用的函数里抛出了 InterruptException,然后其外层的函数捕捉到了,以其为参数 new 了一个 RuntimeException 抛出来。这里 try-cause 的 catch InterruptException 就能捕捉到这个异常,而 try 只能通过 catch RuntimeException 来捕捉这个异常(设为e),然后再看 e.getCause() 是否是 InterruptException 来识别这种情况。说了这么多,这样做有什么意义呢?其实 Storm 里的线程有一种退出机制是通过抛 Exception 来实现的,如果是正常退出,最根源的地方会抛出 InterruptException,异常退出则是抛出其它类型的Exception;上层的函数捕捉到这些异常后,统一套一层 RuntimeException 往外抛。到最最外面的 try-cause 语句,这时已经很难确定这个源头被套了几层其它的 Exception 了,而为了判断线程是否是正常退出,还是需要知道这个源头的类型。如果用 try 的话,就得一层层剥开这个 Exception,其实写出来也就是 try-cause 这个宏了。
3. 总结
最后总结一下,Nimbus 的启动流程大致如此:
- "storm nimbus" 指令通过 Python 脚本调用 Java 的 nimbus 类
- 清除 nimbus 本地上没有但 zookeeper 上有的 topology 记录
- 继续 zookeeper 上记录着的 topology 的状态转换,如继续 kill、继续 rebalance 等
- 开启监控 topology 的定期任务(对 topology 做任务的重新分配)
- 开启清理 nimbus 本地无用 jar 包的定期任务
- 将构造好的 service-handler 传给 Thrift Server
- 为本进程添加一个 shutdown hook
- 启动 Thrift Server,Nimbus 以一个 Thrift Server 的形式在运行