Hadoop Yarn解决多类应用兼容方法

1. Yarn应用兼容

Hadoop Yarn框架是Hadoop 2.0以上的新一代计算框架,在它上面可以跑多类应用,不仅能够跑经典MR计算,还能在该框架上实现实时计算,交互式计算,流式计算等等。正因为这样,Yarn得到了前所未有的关注,发展速度也比MRv1快。MRv1耗时有8年才使其稳定,而Yarn才发展了2年,就已经被许多公司采用。

 

既然Hadoop能在Yarn上实现多类应用框架,那么也可以这么说,它能够同时在Yarn上运行多类应用。这就会遇到Yarn对各类应用的同时支持问题。

 

那么它是怎么做到这样完成各类应用的兼容呢?

 

我在有一篇文章讲解如何在Yarn上实现一个非Map/Reduce应用:Hadoop Yarn上实现Hama BSP计算应用。我们接下来讲一讲在Hadoop Yarn上,它是如何解决各类应用框架的兼容性问题。在遇到多应用综合编程中,这种思路我们值得借鉴。

 

在Hama BSP Yarn应用中,无论是客户端client还是appmaster实现都比较简单,因此它比较容易学。也正因为如此,它就没有跟上已有MRv2实现框架的发展,没有复用MRv2提供的公共API。在这方面,Tez就做的比Hama好些。

 

下面我们从Tez应用代码中学习Yarn是如何解决兼容性问题。

 

hadoop有一个Yarn参数mapreduce.framework.name用来控制你选择的应用框架。在MRv2里,mapreduce.framework.name有两个值:local和yarn。前者是采用本地模式执行计算任务,后者是采用Yarn框架把任务提交到集群中执行。

 2. Yarn Cluster集群初始化

无论是Map/Reduce还是Tez,客户端提交任务都需要先创建一个ClientProtocol接口的实现类。该接口在MRv1中是通过服务端Jobtracker来实现它;而在Yarn和Tez中,它在客户端YARNRunner来实现。你在提交Job时会创建一个JobClient对象,JobClient会初始化Map/Reduce集群Cluster信息。实质上提交Job时所需要的集群Cluster信息就是resourceManager的连接信息,MRv1里面初始化的是JobTracker的连接信息。如下代码是org.apache.hadoop.mapreduce.Cluster类的部分代码。

 

 

package org.apache.hadoop.mapreduce;
// 省去一些import 信息
/**
 * Provides a way to access information about the map/reduce cluster.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Cluster {
  
  @InterfaceStability.Evolving
  public static enum JobTrackerStatus {INITIALIZING, RUNNING};
  
  private ClientProtocolProvider clientProtocolProvider;  //该客户端协议的提供者,MRv2的提供类为LocalClientProtocolProvider, YarnClientProtocol;而Tez的提供类为YarnTezClientProtocolProvider。
  private ClientProtocol client; //生成的是YarnRunner对象
  
  //省略一些类属性...
  //利用ServiceLoader去加载ClientProtocolProvider的继承类,这是JDK提供的功能:多个服务提供者可以实现相同的接口去实现不同的逻辑,这需要按照一定的策略加载相关的文件配置。ServiceLoader实现了Iterable接口,可以直接迭代访问,采用Lazy加载的方式。
  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
      ServiceLoader.load(ClientProtocolProvider.class);
 
  // 略去一些构造函数 ...

// client端JobClient调用Cluster构造函数去初始化集群信息
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException { 
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);   
  }
  //初始化集群信息,主要是建立client与JT/RM的连接
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      //for循环访问产生一个ClientProtocolProvider实现,frameWorkLoader里面的访问对象是根据在classpath的加载顺序来访问的,因此其加载顺序的不同会获得不一样的对象。
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        ClientProtocol clientProtocol = null; 
        try {
          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);   // 创建YarnRunner对象,建立与RM的连接
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);//给定地址创建
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;  //赋给属性供下次使用
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;  //一旦得到一个新的对象(该对象不为NULL)就退出
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider -  returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }
 // 省略...
}
 

 

 从Cluster类中,可以看到Java提供的类ServiceLoader。ServiceLoader是服务加载类,它根据文件配置来在java classpath环境中加载对应接口的实现类。

 

 其配置规定在 classpath中需要把接口名命名成文件名,并把该文件放到classpath环境的META-INF/services文件中。例如:org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider是MRv2提供的接口,那么在Tez部署的client jar包中会有一个相对路径文件名META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider,里面写着Tez的ClientProtocolProvider实现类的限定名:org.apache.tez.mapreduce.YarnTezClientProtocolProvider。

ServiceLoader方法会自动去在classpath环境变量里面按需按序加载这些实现类。

3. ClientProtocolProvider客户端协议提供类

ClientProtocolProvider方法提供两个create方法和一个close方法。create方法用来创建clientProtocol实例,而close用于关闭实例。org.apache.tez.mapreduce.YarnTezClientProtocolProvider是实现代码如下:

package org.apache.tez.mapreduce;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
import org.apache.tez.mapreduce.hadoop.MRConfig;

public class YarnTezClientProtocolProvider extends ClientProtocolProvider {

  @Override
  public ClientProtocol create(Configuration conf) throws IOException {
    //根据mapreduce.framework.name的值来比较你需要的值:FRAMEWORK_NAME=mapreduce.framework.name,YARN_TEZ_FRAMEWORK_NAME=yarn-tez。如果conf里面的参数值是yarn-tez那么就创建一个Tez应用的org.apache.tez.mapreduce.YARNRunner对象。
    if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
      return new YARNRunner(conf);
    }
    return null;
  }

  @Override
  public ClientProtocol create(InetSocketAddress addr, Configuration conf)
      throws IOException {
    return create(conf);
  }

  @Override
  public void close(ClientProtocol clientProtocol) throws IOException {
    // nothing to do
  }

}
MRv2的org.apache.hadoop.mapred.YarnClientProtocolProvider的实现逻辑与Tez的一样,都是创建各自的YarnRunner。 MRv2还实现了一个本地运行的ClientProtocolProvider:org.apache.hadoop.mapred.LocalClientProtocolProvider,其就是创建一个org.apache.hadoop.mapred.LocalJobRunner对象,使其初始化本地环境而不需要建立socket与RM通信。 结合Cluster.initialize()方法和ClientProtocolProvider.create()代码中可以看到,它根据mapreduce.framework.name参数来确定你的Job采用哪类应用。各自的应用只需要实现相应的接口就可以把自己的Job运行在Yarn上。

4. 向Yarn提交Job运行类 YarnRunner 

ClientProtocolProvider用来构造ClientProtocol实现类YarnRunner,它直接产生一个RM代理ResourceMgrDelegate。在MRv2中,ResourceMgrDelegate继承了YarnClientImpl抽象类(同时YarnClientImpl实现了YarnClient接口)通过ApplicationClientProtocol代理直接向RM提交Job,杀死Job,查看Job运行状态等操作。

实际了上,只需要能过YarnClient.createYarnClient()静态方法就可以得到YarnClientImpl对象。

PS:Hadoop 的开发版发展速度出奇地快。因此如果想获得hadoop最新进展,最好经常把trunk代码更新下来。 其MRv2的API可以兼容老版的API,接口还是比较稳定,对原有代码的支持度都比较高,但是从其底层实现细节还正在处于不断地进化中。也正是其更新速度快,往往其它依赖Hadoop Yarn的生态系统出现一种滞后状态,例如Tez,Hama等系统,从它们的代码依赖里可以看出这种差别很明显,从它们的实现逻辑上都可以看出端倪。现在Hadoop 已经进入了2.1.0 beta开发阶段,相信在不久的将来,Hadoop 2.0 Stable版本就要发布了。我们拭目以待。

 

 

 

 

 

 

 

 

 

 

猜你喜欢

转载自zcdeng.iteye.com/blog/1897116