YARN中MRAppMaster的事件驱动模型与状态机处理消息过程的分析

在MRv1中,对象之间的作用关系是基于函数调用实现的,当一个对象向另外一个对象传递消息时,会直接采用函数调用的方式,并且这个过程是串行的。比如,当TaskTracker需要执行一个Task的时候,将首先下载Task依赖的文件(JAR包,二进制文件等,字典文件等),然后执行Task。在整个过程中,下载依赖文件是阻塞式的,也就是说,前一个任务未完成文件下载之前,后一个新任务将一直处于等待状态,只有在下载完成之后,才会启动一个独立进程运行该任务。基于函数调用式的编程模型是低效的,它隐含着整个过程是串行,同步进行的。

相比之下,MRv2引入的时间驱动变成模型则是一种更加高效的方式。在基于事件驱动的编程模型中,所有对象被抽象成了事件处理器,而事件处理器之间通过事件相互关联。每种事件处理一种类型的事件,同时根据需要出发另外一种事件。相比于基于函数调用的编程模型,这种编程方式具有异步并发等特点,更加高效,更加适合大型分布式系统。


下面,我们以负责控制MapReduce作业的AppllicationMaster,也就是MRAppMaster为例,看一下整个事件驱动模型与状态机。

首先,根据源代码,我们可以看到在MRAppMaster中有一个最重要的中央异步消息调度器 AsyncDispatcher,它负责整个MRAppMaster模块的消息调度。除此之外,还有多个其他的消息调度器,比如 JobEventDispatcher, TaskEventDispatcher,TaskAttemptEventDispatcher,SpeculatorEventDispatcher 等等。在MRAppMaster的初始时,在serviceInit() 函数中有以下关键代码

this.jobEventDispatcher = new JobEventDispatcher();
//register the event dispatchers
dispatcher.register(JobEventType.class, jobEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
dispatcher.register(CommitterEventType.class, committerEventHandler);

这里的意思是,当中央消息调度器dispatcher如果收到JobEventType的消息后,将会把这个消息转发给名为jobEventDispatcher的JobEventDispatcher的消息调度器。这个JobEventDispatcher是专门用来处理JobEventType事件的消息处理器。底层实现其实就是将一个pair<JobEventType.class, JobEventDispatcher>这个KV放入中央消息调度器dispatcher的HashMap中。剩下的几个消息调度器一样,注册相应的消息类型和其相对应的消息调度器。

之后dispatcher启动一个处理消息线程,可以在下面代码看到中央消息处理器处理消息。

  Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          Event event;
          try {
            event = eventQueue.take();   // 从中央消息处理器dispatcher的消息队列中获得消息
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);   // 分发该消息
          }
        }
      }
    };
  }
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();  //获取event的类型

    try{
      EventHandler handler = eventDispatchers.get(type);  // 根据event类型从HashMap中获取对应的消息处理器
      if(handler != null) {
        handler.handle(event);  // 使用获得的消息处理器处理这个event,跳转至相应的处理器实现
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    }
    catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }

当Client提交一个Job之后,MRAppMaster创建一个job,并发送一个JOB_INIT事件给中央消息处理器dispatcher。下面我们以JOB_INIT事件为例,看看发生了什么。

首先dispatcher收到JOB_INIT事件之后,根据JOB_INIT的事件类型(JobEventType.class)获得事件的处理器应该是jobEventDispatcher,便将其发送给jobEventDispatcher。

private class JobEventDispatcher implements EventHandler<JobEvent> {
  @SuppressWarnings("unchecked")
  @Override
  public void handle(JobEvent event) {
    ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);  // 由这个job处理这个event
  }
}
JobEventDispatcher处理这个JOB_INIT消息。之后进入JobImpl 的消息处理函数
public void handle(JobEvent event) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Processing " + event.getJobId() + " of type "
        + event.getType());
  }
  try {
    writeLock.lock();
    JobStateInternal oldState = getInternalState();
    try {
       getStateMachine().doTransition(event.getType(), event);   // 获取job的当前的状态机,根据当前消息event进行状态变换。
    } catch (InvalidStateTransitonException e) {                 // 会跳转至 StateMachine 实现的 doTransition()函数。
      LOG.error("Can't handle this event at current state", e);
      addDiagnostic("Invalid event " + event.getType() + 
          " on Job " + this.jobId);
      eventHandler.handle(new JobEvent(this.jobId,
          JobEventType.INTERNAL_ERROR));
    }
    //notify the eventhandler of state change
    if (oldState != getInternalState()) {
      LOG.info(jobId + "Job Transitioned from " + oldState + " to "
               + getInternalState());
      rememberLastNonFinalState(oldState);
    }
  }
  
  finally {
    writeLock.unlock();
  }
}
MRAppMaster消息分发过程到此结束,这之后就进入了Job状态机的操作,当然根据任务的不同,也许会进入Task状态机,TaskAttempt状态机等等。总之,状态机以及状态机中的各种hook进行对应Job/Task/TaskAttempt的控制。

我们知道一个job对应了一个状态机,我们也应该知道在YARN的实现中一个状态机由以下三个部分组成: 1. 状态(节点)  2. 事件(弧)  3. Hook(触发事件后的处理)。

在JobImpl.java文件中,我们可以看到构建job状态机的过程:

protected static final
  StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
     stateMachineFactory
   = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
            (JobStateInternal.NEW)       // 构造JobImpl状态机,初始状态是 NEW 状态

        // Transitions from NEW state
        .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,          // 添加一个状态变化
            JobEventType.JOB_DIAGNOSTIC_UPDATE,
            DIAGNOSTIC_UPDATE_TRANSITION)
        .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,          // 添加一个状态变化
            JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
        .addTransition                                                      // 添加一个状态变化
            (JobStateInternal.NEW,
            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
            JobEventType.JOB_INIT,
            new InitTransition())
        .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
            JobEventType.JOB_KILL,
            new KillNewJobTransition())
        .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
            JobEventType.INTERNAL_ERROR,
            INTERNAL_ERROR_TRANSITION)
        .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT,
            JobEventType.JOB_AM_REBOOT,
            INTERNAL_REBOOT_TRANSITION)
            ...
            ...   
后面还有很多,job状态机是比较一个复杂的状态机,涉及到很多状态与事件,可以通过 对YARN状态机可视化深入了解,在此不做更多讨论。
调用addTransition()函数可以添加一个状态变化。addTransition()函数中有四个参数,分别为: 1.preState  2.postState  3.eventType 4.hook。  它们表示 1.事件发生前的状态,2.事件发生后的状态, 3.事件, 4.事件发生时触发的hook

代码中  addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()) 这一个状态变化表示,事件发生前是 NEW状态,事件发生后是 INITED状态 或者 FAILED状态,事件是JOB_INIT, hook是InitTransition对象。也就是如果Job状态机当前状态是NEW,这是发生了一个JOB_INIT事件,那么状态机会触发InitTransition对象中的doTransition()函数,如果transition()函数返回 INITED状态,那么状态机最新状态就是 INITED, 如果doTransition()函数返回 FAILED状态,那么状态机最新状态就是FAILED。

那么我们来看一下状态机的状态变换的过程。

private class InternalStateMachine
      implements StateMachine<STATE, EVENTTYPE, EVENT> {
  private final OPERAND operand;
  private STATE currentState;

  InternalStateMachine(OPERAND operand, STATE initialState) {
    this.operand = operand;
    this.currentState = initialState;
    if (!optimized) {
      maybeMakeStateMachineTable();
    }
  }

  @Override
  public synchronized STATE getCurrentState() {
    return currentState;
  }

  @Override
  public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
       throws InvalidStateTransitonException  {
    currentState = StateMachineFactory.this.doTransition                       // 发生状态机变化
        (operand, currentState, eventType, event);
    return currentState;
  }
}
private STATE doTransition
         (OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)
    throws InvalidStateTransitonException {
  // We can assume that stateMachineTable is non-null because we call
  //  maybeMakeStateMachineTable() when we build an InnerStateMachine ,
  //  and this code only gets called from inside a working InnerStateMachine .
  Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
    = stateMachineTable.get(oldState);                                // 根据当前状态获取所有该状态有可能发生的事件
  if (transitionMap != null) {
    Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition
        = transitionMap.get(eventType);                               // 根据事件获取对应的hook,为一个Transition对象(Transition是一个接口)
    if (transition != null) {
      return transition.doTransition(operand, oldState, event, eventType);  // 调用该对象的doTransition()函数
    }
  }
  throw new InvalidStateTransitonException(oldState, eventType);
}
private class SingleInternalArc
                  implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {

  private STATE postState;
  private SingleArcTransition<OPERAND, EVENT> hook; // transition hook

  SingleInternalArc(STATE postState,
      SingleArcTransition<OPERAND, EVENT> hook) {
    this.postState = postState;
    this.hook = hook;
  }

  @Override
  public STATE doTransition(OPERAND operand, STATE oldState,
                            EVENT event, EVENTTYPE eventType) {
    if (hook != null) {
      hook.transition(operand, event);          // 调用 hook 的 transition()函数
    }
    return postState;
  }
}
这个 stateMachineTable 是一个 Map<state, Map<eventType, Transition>> 数据结构。根据当前状态可以获取这个状态的所有<eventType, Transition>对应关系。之后根据event类型可以获得 Transition对象(也就是一个hook)。那么,状态机就可以由当前状态,事件调用hook函数了。本质上就是  StateMachine.hook(preState, event) 的形式,因为OO的思想发生了变形。之后就是Transition对象的具体实现了,由于在一开始构造状态机的时候,JobImpl 构造了一个状态变化 addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()),这里的hook是 InitTransition 对象。那么hook被调用的时候,就会调用 InitTransition 对象的 transition() 函数。InitTransition 对象的 transition() 函数中包括了很多初始化一个job所做的操作,比如创建MapTask,创建ReduceTask,创建MapTask所需要的输入文件Split 等等,在这里就不做过多叙述,可以自行阅读相关文章。

这个就是YARN的状态机工作的基本流程以及如何与 MRAppMaster的消息分发机制相关联的过程。由于YARN相对于MRv1的诸多改进,现在MRAppMaster取代了过去MRv1中的JobTracker对Job与Task进行控制,结构更加清晰,模块化更加明显。总体而言,MRAppMaster是一个相对比较复杂的模块,需要进行更多的更加仔细分析。


如需转载请表明 转载自 http://blog.csdn.net/gjt19910817/article/details/43441801



郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。