在yarn 里面Container 是可选的.本文介绍以下两种:
- DefaultContainerExecutor
- LinuxContainerExecutor
由配置参数控制: yarn.nodemanager.container-executor.class
有NodeManager初始化的时候,进行加载
org.apache.hadoop.yarn.server.nodemanager.NodeManager#serviceInit
// todo 始化ContainerExecutor,ContainerExecutor封装了nodeManager对Container操作的各种方法,
// todo 包括启动container, 查询指定id的container是否活着,等操作. 根据配置yarn.nodemanager.container-executor.class
// todo 决定ContainerExecutor的实例, 默认为DefaultContainerExecutor.
ContainerExecutor exec = createContainerExecutor(conf);
try {
exec.init(context);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize container executor", e);
}
DeletionService del = createDeletionService(exec);
addService(del);
@VisibleForTesting
protected ContainerExecutor createContainerExecutor(Configuration conf) {
return ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
}
即: 默认是 DefaultContainerExecutor .
在讲解之前,先看一下他们的父类.
org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor
类图:
所有的ContainerExecutor 都会继承这个类.
这个类中比较重要的方法如下:
setConf:设置配置文件:
init : 初始化
startLocalizer: 为此应用程序中的容器准备执行环境。
prepareContainer : 在编写启动环境之前准备容器。
launchContainer: 在节点上启动容器。 这是一个阻塞调用,仅在容器退出时返回。
relaunchContainer: 重新启动节点上的容器。 这是一个阻塞调用,仅在容器退出时返回。
signalContainer: 具有指定信号的信号容器。
isContainerAlive: 检查容器是否存活
reacquireContainer: 恢复已存在的容器。 这是一个阻塞调用,仅在容器退出时返回。 请注意,在此调用之前必须已激活容器。
writeLaunchEnv : 写入启动环境 默认容器启动脚本。
readDirAsUser: 读取用户目录
getRunCommand: 获取运行命令
isContainerActive: 容器是否存活
activateContainer: 标记容器为活跃状态
pauseContainer: 暂停容器,默认实现是 kill, 可以自定义
getProcessId: 根据容器ID获取进程ID
enum Signal : 信号枚举
NULL(0, "NULL"),
QUIT(3, "SIGQUIT"),
KILL(9, "SIGKILL"),
TERM(15, "SIGTERM");
DelayedProcessKiller : 根据信号, 杀死进程的类. (这个是杀死进程的类,具体情况还没想清楚.)
接下来,我们分别进行讲解:
DefaultContainerExecutor
DefaultContainerExecutor,简称DCE。每个Container运行在单独的进程里,但进程都是由NM的用户启动的。比如NM进程是用yarn用户启动的,那么所有Container的进程也由yarn用户启动。
在ContainerExecutor启动一个Container的过程中,涉及到了三个脚本,它们分别是:
- default_container_executor.sh
- default_container_executor_session.sh
- launch_container.sh
这三个脚本,都是跟Container相关的,所以它们都被放在一个Container所代表的目录结构下。
在NodeManager中,会为每个Application,以及每个Container建立一个对应的目录,在每个Container的目录下,就放置了一些运行这个Container必需的信息。
一般来说,这些目录是位于/tmp这个目录下,并且会在一个Application完成后,被删除。减少磁盘空间的消耗。
我们分别查看一下,上面我们所说的那三个脚本文件的内容。
default-container_executor.sh:
我们可以看到,在这个脚本文件的内部,会启动default_container_executor_session.sh这个脚本,
并将执行结果写入到这个Container的一个名为Container ID+pid.exitcode的文件中。
而default_container_executor_session.sh这个脚本呢?
我们可以看到,它主要是启动launch_container.sh这个脚本。
而我们可以看到,launch_container.sh中,
就负责运行相应的Container,也能是MRAppMaster,也可能是Mapper或者Reducer:
在launch_container.sh中,设置了很多环境变量。
这里因为我查看了一个ApplicationMaster的Container,所以启动的是MRAppMaster。
那么,DefaultContainerExecutor应该就是首先执行default_container_executor.sh这个脚本,对吧?
嗯嗯,没错的。
接下来是代码:
startLocalizer: 设置初始化操作
/**
* todo 设置初始化操作 ,需要传入初始化对象
* todo LocalizerStartContext : 封装启动本地化程序所需的信息。
*
* @param ctx LocalizerStartContext that encapsulates necessary information
* for starting a localizer.
* @throws IOException
* @throws InterruptedException
*/
@Override
public void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException {
Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
//todo 获取 nodeManger 通讯地址
InetSocketAddress nmAddr = ctx.getNmAddr();
String user = ctx.getUser();
String appId = ctx.getAppId();
String locId = ctx.getLocId();
LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();
//todo 本地文件目录
List<String> localDirs = dirsHandler.getLocalDirs();
//todo 本地日志文件目录
List<String> logDirs = dirsHandler.getLogDirs();
//todo 初始化特定用户的本地目录。 localDirs
// create $local.dir/usercache/$user and its immediate parent
createUserLocalDirs(localDirs, user);
//todo 创建用户缓存目录.
// create $local.dir/usercache/$user/appcache
// create $local.dir/usercache/$user/filecache
createUserCacheDirs(localDirs, user);
//todo 创建App 目录
// create $local.dir/usercache/$user/appcache/$appId
createAppDirs(localDirs, user, appId);
//todo 创建爱你App 日志目录
// create $log.dir/$appid
createAppLogDirs(appId, logDirs, user);
// randomly choose the local directory
// todo 创建工作目录
// 从本地存储目录列表中返回随机选择的应用程序目录。 选择目录的概率与其大小成比例。
Path appStorageDir = getWorkingDir(localDirs, user, appId);
String tokenFn =
String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
//todo 复制文件
copyFile(nmPrivateContainerTokensPath, tokenDst, user);
LOG.info("Copying from " + nmPrivateContainerTokensPath
+ " to " + tokenDst);
FileContext localizerFc =
FileContext.getFileContext(lfs.getDefaultFileSystem(), getConf());
localizerFc.setUMask(lfs.getUMask());
localizerFc.setWorkingDirectory(appStorageDir);
LOG.info("Localizer CWD set to " + appStorageDir + " = "
+ localizerFc.getWorkingDirectory());
ContainerLocalizer localizer =
createContainerLocalizer(user, appId, locId, localDirs, localizerFc);
// TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr);
}
launchContainer: 启动Container
这个比较核心, 其实就是构建一个ShellCommandExecutor, 执行 shell 启动命令, 启动脚本....
/**
* todo 启动Container
*
* @param ctx Encapsulates information necessary for launching containers.
* @return
* @throws IOException
* @throws ConfigurationException
*/
@Override
public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException {
Container container = ctx.getContainer();
Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
String user = ctx.getUser();
Path containerWorkDir = ctx.getContainerWorkDir();
List<String> localDirs = ctx.getLocalDirs();
List<String> logDirs = ctx.getLogDirs();
FsPermission dirPerm = new FsPermission(APPDIR_PERM);
ContainerId containerId = container.getContainerId();
// todo 在所有磁盘上创建 container 目录
String containerIdStr = containerId.toString();
String appIdStr =
containerId.getApplicationAttemptId().
getApplicationId().toString();
for (String sLocalDir : localDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, user);
Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(appCacheDir, appIdStr);
Path containerDir = new Path(appDir, containerIdStr);
createDir(containerDir, dirPerm, true, user);
}
// todo 在所有硬盘上创建 log 目录
createContainerLogDirs(appIdStr, containerIdStr, logDirs, user);
//todo 创建临时文件目录: ./tmp
Path tmpDir = new Path(containerWorkDir,
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
createDir(tmpDir, dirPerm, false, user);
// todo copy container tokens to work dir
Path tokenDst =
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
copyFile(nmPrivateTokensPath, tokenDst, user);
// todo copy launch script to work dir
Path launchDst =
new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
copyFile(nmPrivateContainerScriptPath, launchDst, user);
// Create new local launch wrapper script
// todo 创建新的本地启动包装器脚本
LocalWrapperScriptBuilder sb = getLocalWrapperScriptBuilder(
containerIdStr, containerWorkDir);
// Fail fast if attempting to launch the wrapper script would fail due to
// Windows path length limitation.
if (Shell.WINDOWS &&
sb.getWrapperScriptPath().toString().length() > WIN_MAX_PATH) {
throw new IOException(String.format(
"Cannot launch container using script at path %s, because it exceeds " +
"the maximum supported path length of %d characters. Consider " +
"configuring shorter directories in %s.", sb.getWrapperScriptPath(),
WIN_MAX_PATH, YarnConfiguration.NM_LOCAL_DIRS));
}
Path pidFile = getPidFilePath(containerId);
if (pidFile != null) {
//todo 获取pidFile , 写入启动脚本
sb.writeLocalWrapperScript(launchDst, pidFile);
} else {
LOG.info("Container " + containerIdStr
+ " pid file not set. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
// create log dir under app
// fork script
Shell.CommandExecutor shExec = null;
try {
setScriptExecutable(launchDst, user);
setScriptExecutable(sb.getWrapperScriptPath(), user);
shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
containerIdStr, user, pidFile, container.getResource(),
new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment());
//todo containerId 如果存活的话, 启动命令
if (isContainerActive(containerId)) {
// todo ------------- 启动 start ----------------------------------
shExec.execute();
// todo ------------- 启动 end ----------------------------------
} else {
LOG.info("Container " + containerIdStr +
" was marked as inactive. Returning terminated error");
return ExitCode.TERMINATED.getExitCode();
}
} catch (IOException e) {
if (null == shExec) {
return -1;
}
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
// terminated/killed forcefully. In all other cases, log the
// container-executor's output
if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch with container ID: "
+ containerId + " and exit code: " + exitCode , e);
StringBuilder builder = new StringBuilder();
builder.append("Exception from container-launch.\n");
builder.append("Container id: ").append(containerId).append("\n");
builder.append("Exit code: ").append(exitCode).append("\n");
if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
builder.append("Exception message: ");
builder.append(e.getMessage()).append("\n");
}
if (!shExec.getOutput().isEmpty()) {
builder.append("Shell output: ");
builder.append(shExec.getOutput()).append("\n");
}
String diagnostics = builder.toString();
logOutput(diagnostics);
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
diagnostics));
} else {
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
"Container killed on request. Exit code is " + exitCode));
}
return exitCode;
} finally {
if (shExec != null) shExec.close();
}
return 0;
}
同时,我们也可以看到,这个实现有一些问题,即,对于资源隔离做的并不好。
全部Container都是由运行NodeManager的那个用户启动的。
LinuxContainerExecutor
LinuxContainerExecutor,简称LCE。每个Container由不同的用户启动。比如A用户提交的job的container,都由A用户启动。此外支持cgroup、支持单独的配置文件、支持简单的ACL。
LCE明显隔离性更好,但有一些限制:
- 需要linux native程序支持。准确的说是一个container-executor程序,用C写的,代码见hadoop-yarn-project\hadoop-yarn\hadoop-yarn-server\hadoop-yarn-server-nodemanager\src\main\native\container-executor。编译hadoop时务必同时编译container-executor。container-executor的路径由属性
yarn.nodemanager.linux-container-executor.path
指定。 - container-executor还需要一个配置文件container-executor.cfg。而且这个配置文件和container-executor的二进制文件相对路径是固定的。默认情况下container-executor会去
../etc/hadoop
路径下寻找配置文件,找不到的话会报错。可以在编译hadoop时指定:mvn package -Pdist,native -DskipTests -Dtar -Dcontainer-executor.conf.dir=../../conf
。不知道为何要这样设计。 - 由于用不同的用户启动Container,所以必须有对应的Linux用户存在。否则会抛异常。这带来一些管理上的麻烦,比如新增一个用户B时,必须在所有NM节点上执行
useradd B
。 - container-executor和container-executor.cfg的所有者必须是root。而且他们所在的目录一直上溯到/,所有者也必须是root。所以我们一般把这两个文件放在/etc/yarn下。
- container-executor文件的权限必须是
6050 or --Sr-s---
,因为它的原理就是setuid/setgid。group owner必须和启动NM的用户同组。比如NM由yarn用户启动,yarn用户属于hadoop组,那container-executor必须也是hadoop组。
在3.2.0版本 LCEResourcesHandler 即将注销掉.
* @see LinuxContainerRuntime
* @see DelegatingLinuxContainerRuntime
* @see DefaultLinuxContainerRuntime
* @see DockerLinuxContainerRuntime
* @see DockerLinuxContainerRuntime#isDockerContainerRequested
内存隔离
YARN对内存其实没有真正隔离,而是监视Container进程的内存使用,超出限制后直接杀掉进程。相关逻辑见ContainersMonitorImpl类。
进程监控的逻辑见ProcfsBasedProcessTree类,原理就是读取/proc/$pid下面的文件,获得进程的内存占用。
具体的逻辑没详细看,还有点复杂的。
CPU隔离
YARN在默认情况下,完全没有考虑CPU的隔离,即使用了LCE。
所以如果某个任务是CPU密集型的,可能消耗掉整个NM的CPU。
(跟具体的应用有关。对MR而言,最多用满一个核吧。)
cgroup
YARN支持cgroup隔离CPU资源:YARN-3。
cgroup必须要LCE,但默认情况下没有开启。可以设置属性yarn.nodemanager.linux-container-executor.resources-handler.class
为org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler以开启。
关于cgroup还有很多属性可以调整,见yarn-default.xml中的配置。
localize过程
研究ContainerExecutor的过程中,发现了这个东西,研究的痛不欲生。。。
这其实就是类似于以前的distributed cache,但是YARN做的更通用了。
主要是分发container运行需要的所有文件,包括一些lib、token等等。
这个过程称为localize,由ResourceLocalizationService类负责。
分几步:
- 建相关目录。$local.dir/usercache/$user/filecache,用于暂存用户可见的distributed cache;$local.dir/usercache/$user/appcache/$appid/filecache,用于暂存app可见的distributed cache;$log.dir/$appid/$containerid,用于暂存日志。我这里只列出了最深一级目录,父目录不存在也会新建。对DCE而言,直接用java代码建这些目录。对于LCE,调用container-executor建目录,见上文container-executor的Usage。注意这些目录会在所有磁盘上建(我们的节点一般是12块盘,就建12次),但只有一个会被真正使用。
- 将token文件写到$local.dir/usercache/$user/appcache/$appid目录。这里有bug,无论DCE还是LCE,都会将token文件写到第一个local-dir,所以可能会有竞争,导致后续container启动失败。见YARN-2566、YARN-2623。
- 对于DCE,直接new一个ContainerLocalizer对象,调用runLocalization方法。这个方法的作用是从ResourceLocalizationService处获取要分发的文件的URI,并下载到本地。对于LCE,会单独启动一个JVM进程,通过RPC协议LocalizationProtocol与ResourceLocalizationService通信。功能是一样的。
在这里, 关于 LinuxContainerExecutor 部分的代码 ,hadoop3.2.0版本貌似要换用心的版本.等以后整理完,再补充..................
有好的资料,或者文章麻烦推荐一下,对应的资料缺失蛮少.
参考连接: