上一篇番外,写了Yarn的调度机制,这种大型的调度,主要用于类与类之间的调度,简单来说,就是一个类对于那些觉得自己管控不了的事情,就将其扔给别人来处理。
Yarn中很多的类都是有调度器的,而且它们大多都是持有全局调度器,对于自己处理不了的事情,干脆利落地将其扔到RM或者NM的调度器中。
全局调度器就相当于一个统筹者,一根线,把所有的类给串起来,而在很多类的内部,同样支持事件的处理,而本文就讨论下这种处理机制,状态机。
声明:本文有些图片来自于董老师的博客和书,所以给董老师的博客打个广告:
接下来,我们认真讨论下状态机;先上两个类:
1:StateMachine
public interface StateMachine<STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT> { public STATE getCurrentState(); public STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitonException; }
这是状态机的基本定义,一个接口,两个方法,getCurrentState获取状态机的当前状态,doTransition执行状态转化;具体分析具体的类是如何调用的。
2:StateMachineFactory
* @param <OPERAND> * The object type on which this state machine operates. * @param <STATE> * The state of the entity. * @param <EVENTTYPE> * The external eventType to be handled. * @param <EVENT> * The event object. * */ @Public @Evolving final public class StateMachineFactory<OPERAND, STATE extends Enum<STATE>, EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
注释很简单,OPERAND代表该状态机所在的实体,STATE代表该实体内当前状态机的状态,EVENTYPE代表需要处理的事件的类型,EVENT代表事件对象。
顾名思义,状态机工厂,负责生产状态机的类,这里,我们首先介绍成员变量,其中的方法,在具体分析某个类的状态机再讲述:
private final TransitionsListNode transitionsListNode;//转换的链表,承载了转变过程 private Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> stateMachineTable;//遇到再细说 private STATE defaultInitialState;//初始状态 private final boolean optimized;//遇到再说
接下来,我们找个状态机来分析一把
3:从RMAppAttemptImpl来说状态机:
private final StateMachine<RMAppAttemptState, RMAppAttemptEventType, RMAppAttemptEvent> stateMachine;
其状态机的定义不多说:
从StateMachineFactory开始看,其状态机工厂的代码非常复杂,看最初的一段:
private static final StateMachineFactory<RMAppAttemptImpl, RMAppAttemptState, RMAppAttemptEventType, RMAppAttemptEvent> stateMachineFactory = new StateMachineFactory<RMAppAttemptImpl, RMAppAttemptState, RMAppAttemptEventType, RMAppAttemptEvent>( RMAppAttemptState.NEW)
从StateMachineFactory定义的角度来看看,这个状态机工厂的初始化定义:
- RMAppAttemptImpl:本状态机工厂针对的实体类,状态机发生的转变所触及的操作,都是针对于该实体类的
- RMAppAttemptState:其实就是RMAppAttemptImpl当前的状态
- RMAppAttemptEventType:会触发状态机工厂或者状态机状态改变的事件类型,落到本类上,就是RMAppAttemptStateType,这种事件类型会触发RMAppAttemptImpl的改变
- RMAppAttemptEvent:代表触发的事件对象,
状态机工厂的初始化:
public StateMachineFactory(STATE defaultInitialState) { this.transitionsListNode = null; this.defaultInitialState = defaultInitialState; this.optimized = false; this.stateMachineTable = null; }
对照来看,发现我们这里新建的状态机工厂,默认状态是RMAppAttemptState.NEW,后续的状态转换,都会建立在这个初始状态基础之上,即这里的STATE已经变成了RMAppAttemptState.NEW:
接着看:
addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, RMAppAttemptEventType.START, new AttemptStartedTransition())
这是调用的StateMachineFactory的方法,看看:
/** * @return a NEW StateMachineFactory just like {@code this} with the current * transition added as a new legal transition * * Note that the returned StateMachineFactory is a distinct object. * * This method is part of the API. * * @param preState * pre-transition state * @param postState * post-transition state * @param eventType * stimulus for the transition * @param hook * transition hook */ public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> addTransition( STATE preState, STATE postState, EVENTTYPE eventType, SingleArcTransition<OPERAND, EVENT> hook) { return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>( this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>( preState, eventType, new SingleInternalArc(postState, hook))); }
我们看下addTransition的方法,在StateMachineFactory内,有很多重载的addTransition方法,我们一个个看,先看本方法,这牵涉到了一个新状态机的初始化,并且新建了一个ApplicableSingleOrMultiTransition,我们拆开看看:
先看这个初始化方法:
private StateMachineFactory( StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) { this.defaultInitialState = that.defaultInitialState; this.transitionsListNode = new TransitionsListNode(t, that.transitionsListNode); this.optimized = false; this.stateMachineTable = null; }
就RMAppAttemptImpl内部的这个状态机工厂来说,其defaultState是RMAppAttemptState.NEW,而我们调用addTransition方法传入的:
prestate : RMAppAttemptState.NEW poststate : RMAppAttemptState.SUBMITTED eventType : RMAppAttemptState.START
参数先放在这里,我们看看addTransition传入的第二个参数,目测是一个单边或多边转换,看看内容:
ApplicableSingleOrMultipleTransition(STATE preState, EVENTTYPE eventType, Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) { this.preState = preState; this.eventType = eventType; this.transition = transition; }
初始化很简单,也很明确,三个成员变量,而实际上,这里代表的意思是,preState代表状态机的转换前状态,eventType代表将会触发事件:
我们重点看下第三个SingleInternalArc:
private class SingleInternalArc implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> { private STATE postState; private SingleArcTransition<OPERAND, EVENT> hook; // transition hook SingleInternalArc(STATE postState, SingleArcTransition<OPERAND, EVENT> hook) { this.postState = postState; this.hook = hook; } }
我们看看这个初始化,非常简单的变量赋值,但是牵涉到一个hook,一个转换的钩子:
/** * Hook for Transition. This lead to state machine to move to the post state as * registered in the state machine. */ @Public @Evolving public interface SingleArcTransition<OPERAND, EVENT> { /** * Transition hook. * * @param operand * the entity attached to the FSM, whose internal state may * change. * @param event * causal event */ public void transition(OPERAND operand, EVENT event); }
注释非常简单,我们能看出来,这个钩子帮助我们对状态机的状态进行转换,就好比如,触发了事件event,然后对应对象的状态机的状态会发生改变。
到这里,我们把参数向上进行递推:
我们这里定义的SingleArcTransition<OPERAND,EVENT>,实际传入的OPERAND是RMAppAttemptImpl,传入的事件对象是:RMAppAttemptEvent,我们可以看到:
private static final class AttemptStartedTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { boolean transferStateFromPreviousAttempt = false; if (event instanceof RMAppStartAttemptEvent) { transferStateFromPreviousAttempt = ((RMAppStartAttemptEvent) event) .getTransferStateFromPreviousAttempt(); } appAttempt.startTime = System.currentTimeMillis(); // Register with the ApplicationMasterService appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); if (UserGroupInformation.isSecurityEnabled()) { appAttempt.clientTokenMasterKey = appAttempt.rmContext .getClientToAMTokenSecretManager().createMasterKey( appAttempt.applicationAttemptId); } // Add the applicationAttempt to the scheduler and inform the // scheduler // whether to transfer the state from previous attempt. appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( appAttempt.applicationAttemptId, transferStateFromPreviousAttempt)); } }
直接从AttemptedStartedTransition开始看,比较清晰明了,其逻辑我们先不分析;这是个继承了BaseTransition的类:
private static class BaseTransition implements SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { } }
而实际上BaseTransition是继承了SingleArcTransition的,在这里,我们找到了hook所对应的OPERAND和本次的事件对象:
从而,我们得到了上文传入的ApplicableSingleOrMultipleTransition,其类型对应的四个参数是:
OPERAND : RMAppAttemptImpl STATE : RMAppAttemptState.NEW EVENTTYPE : RMAppAttemptEventType.START EVENT : RMAppAttemptEvent
好的,再往上推一层:
new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>( this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>( preState, eventType, new SingleInternalArc(postState, hook)))
private StateMachineFactory( StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) { this.defaultInitialState = that.defaultInitialState; this.transitionsListNode = new TransitionsListNode(t, that.transitionsListNode); this.optimized = false; this.stateMachineTable = null; }
看看我们这次新建的状态机工厂是什么样的:
- 默认状态保持不变
- 新建了一个transitionListNode节点,原先的节点是个Null,所以我们看看这个transitionListNode的构造方法:
TransitionsListNode( ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition, TransitionsListNode next) { this.transition = transition; this.next = next; }
本质上,就相当于建立了一个头结点的链表,就我们目前的这种调用来说,新建的TransitionsListNode的成员变量:transition为传入的那个转换,next则是Null。
我们返回的状态机工厂也清晰明了,其他都没变化,主要是其中的transitionListNode的头结点变成了此次新增的一个转换,即一个Transition:
而这个transition的定义呢:是定义了OPERAND(RMAppAttemptImpl)从STATE(RMAppAttemptState.NEW),如果有外部事件触发,并且事件触发类型(EVENTTYPE)是RMAppAttemptEventType.START类型,则会引发一次操作,并使得状态机工厂的状态更新为:RMAppAttemptState.SUBMITTED。
注意,这里提及的是一种单边转换,我们直接以参数顺序来说明吧:
- 当前状态
- 转变之后的状态
- 触发本次转换的事件类型
- 触发转换同时触发的操作
每次调用addTransition方法之后,都会给原先的状态机加上一种转化,这种转化指定了prestate,poststate,触发转换的类型,本次转换对于状态机所在对象的操作。
前文提过,addTransition方法有五种重载,目前先介绍这一种,为了把我们整体的状态机的使用原理先介绍完毕,后续会把余下四种方式介绍完毕。
.addTransition(RMAppAttemptState.KILLED, RMAppAttemptState.KILLED, RMAppAttemptEventType.CONTAINER_FINISHED, new ContainerFinishedAtFinalStateTransition()) .installTopology();
这是状态机工厂创建的最后一句,前面的分析让我们明白,每次调用addTransition方法,其实就是给状态机工厂内部持有的transitionListNode添加了一个转换,这个转换如上的定义,而这个转换每次是使用头插法,不断插入到transitionListNode的头结点位置。
最后,我们看看installTopology方法:
public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> installTopology() { return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true); }
private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, boolean optimized) { this.defaultInitialState = that.defaultInitialState; this.transitionsListNode = that.transitionsListNode; this.optimized = optimized; if (optimized) { makeStateMachineTable(); } else { stateMachineTable = null; } }
因为我们这里定义了optimized为true,所以走的是makeStateMachineTable方法:
private void makeStateMachineTable() { Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack = new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>(); Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(); prototype.put(defaultInitialState, null); // I use EnumMap here because it'll be faster and denser. I would // expect most of the states to have at least one transition. stateMachineTable = new EnumMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype); for (TransitionsListNode cursor = transitionsListNode; cursor != null; cursor = cursor.next) { stack.push(cursor.transition); } while (!stack.isEmpty()) { stack.pop().apply(this); } }
我们认真看看这个方法,很清晰,从头结点开始遍历,然后把所有的转换都放入stack内,stack作为栈,是先进后出的,然后再进行出栈,执行apply方法,那我们看看apply方法:
@Override public void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) { Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = subject.stateMachineTable .get(preState); if (transitionMap == null) { // I use HashMap here because I would expect most EVENTTYPE's to not // apply out of a particular state, so FSM sizes would be // quadratic if I use EnumMap's here as I do at the top level. transitionMap = new HashMap<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>(); subject.stateMachineTable.put(preState, transitionMap); } transitionMap.put(eventType, transition); }
必须注意,这里的subject,其实传入的就是当前状态机工厂,而默认情况下我们倒着把前面进行的转换扔进来,会发现,第一次提交的preState其实是最后放入的prestate,实质上就是:RMAppAttemptState.KILLED。
所以,毫无疑问,第一次取出的transitionMap是Null,然后新建了一个transitionMap,这时候依旧是个Null,最后一句代码给transitionMap赋值,key为RMAppAttemptState.KILLED,其value为对应的一次转化操作。
这样不断持续下去,最终形成了的结构其实是两层结构:
- stateMachineTable的key是一种随机的初始状态
- value是一个Map,定义触发转化事件的事件类型为key,对应的转化为value。
一直到所有操作结束:
但是,直到现在,我们还没看到我们此次状态机的诞生,这里说的都是状态机工厂而已,别着急,这就来:
4:状态机
this.stateMachine = stateMachineFactory.make(this);
毫无疑问,我们要看看make方法做了什么:
public StateMachine<STATE, EVENTTYPE, EVENT> make(OPERAND operand) { return new InternalStateMachine(operand, defaultInitialState); }
private class InternalStateMachine implements StateMachine<STATE, EVENTTYPE, EVENT> { private final OPERAND operand; private STATE currentState; InternalStateMachine(OPERAND operand, STATE initialState) { this.operand = operand; this.currentState = initialState; if (!optimized) { maybeMakeStateMachineTable(); } }
毫无疑问,我们必须要看看,这里的optimized到底是不是false,果然,的确初始化的时候是false:
private synchronized void maybeMakeStateMachineTable() { if (stateMachineTable == null) { makeStateMachineTable(); } }
但实际上,如果发现是空的话,还是执行我们前面执行过的逻辑,所以,返回的InternalStateMachine其实两个成员变量分别为:
- OPERAND : 状态机对应的实例对象,注意,是实例对象
- defaultState : 初始状态
看起来就到这儿了,我们还得继续看看其调用逻辑,才知道到底这个状态机是怎么起到我们想要的效果的:
其实调用的代码在状态机内部:
@Override public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event) throws InvalidStateTransitonException { currentState = StateMachineFactory.this.doTransition(operand, currentState, eventType, event); return currentState; }
大家可以看到,这里调用的其实是一个StateMachineFactory内部InternalStateMachine的方法。
还是通俗地说吧,创建状态机的过程,其实就是在StateMachineFactory内部的InternalMachine内部创建了一个自己的Map,其key就是对象自身,对应的Map中,则存储的是<state,transition>,而transition中指定了从当前状态开始,对指定的操作对象的一种转换操作,而对应的对象,会把其自身的状态更换为本次转换返回的类型。
让我们切实看看一次转换的操作吧:
/** * Effect a transition due to the effecting stimulus. * * @param state * current state * @param eventType * trigger to initiate the transition * @param cause * causal eventType context * @return transitioned state */ private STATE doTransition(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event) throws InvalidStateTransitonException { // We can assume that stateMachineTable is non-null because we call // maybeMakeStateMachineTable() when we build an InnerStateMachine , // and this code only gets called from inside a working InnerStateMachine . Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap = stateMachineTable.get(oldState); if (transitionMap != null) { Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition = transitionMap.get(eventType); if (transition != null) { return transition.doTransition(operand, oldState, event, eventType); } } throw new InvalidStateTransitonException(oldState, eventType); }
代码注释就很明确了,实现的目的就是,转换操作,以及转换后的状态赋值。
状态机必须从源码角度去理解清楚,这样才能够在看源码时候得心应手,这是我的深切体会,因为源码中用到状态机的地方是在太多太多了。
women