源码阅读——hadoop yarn之AsyncDispatcher

最近看了读到了关于hadoop中yarn的编程模型的文章 http://dongxicheng.org/mapreduce-nextgen/programming-model/,想深入了解下它的事件机制是怎么实现的,就看了看其中的AsyncDispatcher类的源码,幸好该类涉及到的其它类不多,读起来也不算吃力

我们先来看其继承关系,AsyncDispatcher继承了AbstractService类,实现了Dispatcher接口,其中Dispatcher接口主要是定义了两个方法register以及getEventHandler,以及一个用于配置事件循环是否在错误后推出的配置,不用太关注这个。

重点看下AbstractService,AbstractService类实现了Service接口,AbstractService中我们主要关注几个方法:

服务生命周期部分:init,start,stop,serviceInit,serviceStart,serviceStop

服务监听器部分:registerServiceListener,unregisterServiceListener,registerGlobalListener,unregisterGlobalListener

其中init,start,stop是继承自service接口的,是暴露给用户用于初始化,启动和停止服务的接口,而serviceInit,serviceStart,serviceStop是空方法,在init,start,stop中被调用,用于子类继承重写,从而自定义自己的初始化,启动以及停止操作。

那么为什么不直接让子类重写init,start,stop方法呢,因为init,start,stop除了调用你们自定义的启动过程,还做了一些其它的事,就是改变service的状态以及通知其它监听了该服务的回掉,其中registerServiceListener,unregisterServiceListener,registerGlobalListener,unregisterGlobalListener就是用于注册服务状态回掉的接口,注意,此处的的回掉不是事件回掉,而是服务状态改变时的回掉,每个服务都有该功能,不要和后文的AbstractService的事件回掉弄混了。

大致了解了AbstractService的功能是提供自定义的服务初始化,启动和停止的接口以及维持服务状态+状态改变后通知接听者,我们接下来就可以开始看AbstractService了,对于AbstractService的大致流程我们大致可以在脑中想一下,不外乎内部有一个事件队列,同时有一个线程不断的从事件队列里取事件,然后根据注册情况进行分发,大致思路就是这样的。

我们重点是看看具体的实现,首先我们看创建取事件的线程的

Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
//        首先定义两个循环终止条件,stopped变量以及线程被Interrupt,其中stopped是volatile,保证可以及时的看到改变
        while (!stopped && !Thread.currentThread().isInterrupted()) {
//          任务队列是否为空
          drained = eventQueue.isEmpty();
          // blockNewEvents is only set when dispatcher is draining to stop,
          // adding this check is to avoid the overhead of acquiring the lock
          // and calling notify every time in the normal run of the loop.
          if (blockNewEvents) {
            synchronized (waitForDrained) {
              if (drained) {
//                任务队列为空才提醒,waitForDrained就是一个Object的实例,用于做线程间通信而已
                waitForDrained.notify();
              }
            }
          }
          Event event;
          try {
//            用阻塞的模式从事件队列的取事件
            event = eventQueue.take();
          } catch(InterruptedException ie) {
//            如果take被中断,且不是service被停止导致的,那么log打印一句警告
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
//            分发事件
            dispatch(event);
          }
        }
      }
    };
  }

大致就是做了take事件,然后dispatch事件的操作,其它操作则是为了保证线程优雅推出,以及推出前完成当前事件队列里事件的分发

接下来就是dispatch方法了

protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

//    获取事件类型
    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
//      获取相应的handler,EventHandler是个接口
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
//        调用handler的handle方法,
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    } catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      // If serviceStop is called, we should exit this thread gracefully.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
        Thread shutDownThread = new Thread(createShutDownThread());
        shutDownThread.setName("AsyncDispatcher ShutDown handler");
        shutDownThread.start();
      }
    }
  }

这个没啥好说的,注意一点就是,dispatch方法是直接调用了handler的handle方法的,所以如果handle里会出现要处理长时间的任务,务必记得新开一个线程执行,不然会影响eventloop的

接下来看一下用于注册事件的接口

public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>)
    eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

这个纯粹看英文注释就知道注册事件时碰到的三种情况(事件未注册过,事件已注册过但事件的handler不是符合handler,事件已注册过且事件的handler是复合handler)下进行的操作了,提一句MultiListenerHandler是实现了EventHandler接口的类,代表符合handler,里面维持了一个handler的list

接下来看看怎么触发事件,正常来说就是将事件推入事件队列即可,但是hadoop这里的实现让我有点不太明白为什么要这么做,希望有人可以指导一下

首先我们看一下项目里别人是怎么触发事件的

dispatcher.getEventHandler().handle(event);

然后我们看一下getEventHandler方法

@Override
public EventHandler getEventHandler() {
  return handlerInstance;
}

handlerInstance则是

private final EventHandler handlerInstance = new GenericEventHandler();

而GenericEventHandler则是实现了EventHandler接口的一个内部类

class GenericEventHandler implements EventHandler<Event> {
    public void handle(Event event) {
      if (blockNewEvents) {
        return;
      }
      drained = false;

      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize != 0 && qSize % 1000 == 0
          && lastEventQueueSizeLogged != qSize) {
        lastEventQueueSizeLogged = qSize;
        LOG.info("Size of event-queue is " + qSize);
      }
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      }
      try {
        eventQueue.put(event);
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        }
        // Need to reset drained flag to true if event queue is empty,
        // otherwise dispatcher will hang on stop.
        drained = eventQueue.isEmpty();
        throw new YarnRuntimeException(e);
      }
    };
  }

没错,我们终于在GenericEventHandler的handle方法里看到了将事件推入事件队列的操作了,不太理解为什么要绕这么一个大弯,望有人解答。handle里还做了检查事件队列剩余大小的工作。

最后我们看一下AsyncDispatcher的停止服务的方法

@Override
  protected void serviceStop() throws Exception {
//    用一个标准位表示是否要处理完事件队列里的事件,该标志位是有接口可以更改的
    if (drainEventsOnStop) {
//      将blockNewEvents设为true,禁止再向事件队列里添加事件了
      blockNewEvents = true;
      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
      long endTime = System.currentTimeMillis() + getConfig()
          .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
              YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);

      synchronized (waitForDrained) {
        while (!drained && eventHandlingThread != null
            && eventHandlingThread.isAlive()
            && System.currentTimeMillis() < endTime) {
//          等待事件循环线程将事件队列里的任务处理完
          waitForDrained.wait(1000);
          LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
              eventHandlingThread.getState());
        }
      }
    }
//    设置服务停止标志位为true
    stopped = true;
    if (eventHandlingThread != null) {
//      中断事件循环线程
      eventHandlingThread.interrupt();
      try {
//        等待线程结束
        eventHandlingThread.join();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);
      }
    }

    // stop all the components
    super.serviceStop();
  }

猜你喜欢

转载自blog.csdn.net/tjq980303/article/details/80384511