共计 15562 个字符,预计需要花费 39 分钟才能阅读完成。
上一篇浅析了 Hadoop 心跳机制的 TT(TaskTracker)方面(http://www.linuxidc.com/Linux/2014-06/103216.htm),这一篇浅析下 JT(JobTracker)方面。
我们知道心跳是 TT 通过 RPC 请求调用 JT 的 heartbeat() 方法的,TT 在调用 JT 的 heartbeat 回收集自身的状态信息封装到 TaskTrackerStatus 对象中,传递给 JT。下面看看 JT 如何处理来自 TT 的心跳。
目录 :
Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——TT 篇 http://www.linuxidc.com/Linux/2014-06/103216.htm
Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——JT 篇 http://www.linuxidc.com/Linux/2014-06/103217.htm
Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——命令篇 http://www.linuxidc.com/Linux/2014-06/103218.htm
1.JobTracker.heartbeat():
// Make sure heartbeat is from a tasktracker allowed by the jobtracker.
if (!acceptTaskTracker(status)) {
throw new DisallowedTaskTrackerException(status);
}
第一步是检查发送心跳请求的 TT 是否属于可允许的 TT,这个是根据一个 HostsFileReader 对象进行判断的,该对象是在实例化 JT 的时候创建的,这个类保存了两个队列,分别是 includes 和 excludes 队列,includes 表示可以访问的 host 列表,excludes 表示不可访问的 host 列表,这两个列表的内容根据两个 mapred.hosts 和 mapred.hosts.exclude(mapred-site,xml 中,默认是 null)这两个参数指定的文件名读取的。具体可参考 JT 源码 1956 行。
2.JobTracker.heartbeat():
String trackerName = status.getTrackerName();
long now = clock.getTime();
if (restarted) {
faultyTrackers.markTrackerHealthy(status.getHost());
} else {
faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
}
第一步是检查发送心跳请求的 TT 是否属于可允许的 TT,这个是根据一个 HostsFileReader 对象进行判断的,该对象是在实例化 JT 的时候创建的,这个类保存了两个队列,分别是 includes 和 excludes 队列,includes 表示可以访问的 host 列表,excludes 表示不可访问的 host 列表,这两个列表的内容根据两个 mapred.hosts 和 mapred.hosts.exclude(mapred-site,xml 中,默认是 null)这两个参数指定的文件名读取的。具体可参考 JT 源码 1956 行。
2.JobTracker.heartbeat():
String trackerName = status.getTrackerName();
long now = clock.getTime();
if (restarted) {
faultyTrackers.markTrackerHealthy(status.getHost());
} else {
faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
}
这一步是检查 TT 是否重启,是重启的话标识该 TT 的状态为健康的,否则检查 TT 的健康状态。faultyTrackers.markTrackerHealthy(status.getHost()) 内部将该 TT 所在的 Host 上所有的 TT(从这里可以看出 hadoop 考虑到一个 Host 上可能存在多个 TT 的可能)从黑名单,灰名单和可能存在错误的列表上删除,也就是从 potentiallyFaultyTrackers 队列中移除该 Host,通过更新 JT 的 numGraylistedTrackers/numBlacklistedTrackers 数量以及 JT 的 totalMapTaskCapacity 和 totalReduceTaskCapacity 数量。至于如何检查 TT 健康状态,具体是根据 JT 上记录的关于 TT 执行任务失败的次数来判断的(具体不是太理解)。
————————————– 分割线 ————————————–
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
————————————– 分割线 ————————————–
3.JobTracker.heartbeat():
HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);
boolean addRestartInfo = false;
if (initialContact != true) {
// If this isn’t the ‘initial contact’ from the tasktracker,
// there is something seriously wrong if the JobTracker has
// no record of the ‘previous heartbeat’; if so, ask the
// tasktracker to re-initialize itself.
if (prevHeartbeatResponse == null) {
// This is the first heartbeat from the old tracker to the newly
// started JobTracker
if (hasRestarted()) {
addRestartInfo = true;
// inform the recovery manager about this tracker joining back
recoveryManager.unMarkTracker(trackerName);
} else {
// Jobtracker might have restarted but no recovery is needed
// otherwise this code should not be reached
LOG.warn(“Serious problem, cannot find record of ‘previous’ ” +
“heartbeat for ‘” + trackerName +
“‘; reinitializing the tasktracker”);
return new HeartbeatResponse(responseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}
} else {
// It is completely safe to not process a ‘duplicate’ heartbeat from a
// {@link TaskTracker} since it resends the heartbeat when rpcs are
// lost see {@link TaskTracker.transmitHeartbeat()};
// acknowledge it by re-sending the previous response to let the
// {@link TaskTracker} go forward.
if (prevHeartbeatResponse.getResponseId() != responseId) {
LOG.info(“Ignoring ‘duplicate’ heartbeat from ‘” +
trackerName + “‘; resending the previous ‘lost’ response”);
return prevHeartbeatResponse;
}
}
}
此处第一句从 JT 记录的 HeartbeatResponse 队列中获取该 TT 的 HeartbeatResponse 信息,即判断 JT 之前是否收到过该 TT 的心跳请求。如果 initialContact!=true,表示 TT 不是首次连接 JT,同时如果 prevHeartbeatResponse==null,根据注释可以知道如果 TT 不是首次连接 JT,而且 JT 中并没有该 TT 之前的心跳请求信息,表明 This is the first heartbeat from the old tracker to the newly started JobTracker。判断 hasRestarted 是否为 true,hasRestarted 是在 JT 初始化(initialize() 方法)时,根据 recoveryManager 的 shouldRecover 来决定的,hasRestarted=shouldRecover,所以当需要进行 job 恢复时,addRestartInfo 会被设置为 true,即需要 TT 进行 job 恢复操作,同时从 recoveryManager 的 recoveredTrackers 队列中移除该 TT。如果不需要进行任务恢复,则直接返回 HeartbeatResponse,并对 TT 下重新初始化指令(后期介绍),注意此处返回的 responseId 还是原来的 responseId,即 responseId 不变。上面说的都是 prevHeartbeatResponse==null 时的情况,下面说说 prevHeartbeatResponse!=null 时如何处理,当 prevHeartbeatResponse!=null 时会直接返回 prevHeartbeatResponse,而忽略本次心跳请求。
更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-06/103217p2.htm
4.JobTracker.heartbeat():
// Process this heartbeat
short newResponseId = (short)(responseId + 1);
status.setLastSeen(now);
if (!processHeartbeat(status, initialContact, now)) {
if (prevHeartbeatResponse != null) {
trackerToHeartbeatResponseMap.remove(trackerName);
}
return new HeartbeatResponse(newResponseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}
首先将 responseId+1,然后记录心跳发送时间。接着来看看 processHeartbeat() 方法。
5.JobTracker.processHeartbeat():
boolean seenBefore = updateTaskTrackerStatus(trackerName,
trackerStatus);
根据该 TT 的上一次心跳发送的状态信息更新 JT 的一些信息,如 totalMaps,totalReduces,occupiedMapSlots,occupiedReduceSlots 等,接着根据本次心跳发送的 TT 状态信息再次更新这些变量。
6.JobTracker.processHeartbeat():
TaskTracker taskTracker = getTaskTracker(trackerName);
if (initialContact) {
// If it’s first contact, then clear out
// any state hanging around
if (seenBefore) {
lostTaskTracker(taskTracker);
}
} else {
// If not first contact, there should be some record of the tracker
if (!seenBefore) {
LOG.warn(“Status from unknown Tracker : ” + trackerName);
updateTaskTrackerStatus(trackerName, null);
return false;
}
}
如果该 TT 是首次连接 JT,且存在 oldStatus,则表明 JT 丢失了 TT,具体意思应该是 JT 在一段时间内与 TT 失去了联系,之后 TT 恢复了,所以发送心跳时显示首次连接。lostTaskTracker(taskTracker):会将该 TT 从所有的队列中移除,并将该 TT 上记录的 job 清除掉 (kill 掉),当然对那些已经完成的 Job 不会进行次操作。当 TT 不是首次连接到 JT,但是 JT 却没有该 TT 的历史 status 信息,则表示 JT 对该 TT 未知,所以重新更新 TaskTracker 状态信息。
7.JobTracker.processHeartbeat():
updateTaskStatuses(trackerStatus);
updateNodeHealthStatus(trackerStatus, timeStamp);
更新 Task 和 NodeHealth 信息,较复杂。
8.JobTracker.heartbeat():如果 processHeartbeat() 返回 false,则返回 HeartbeatResponse(),并下达重新初始化 TT 指令。
// Initialize the response to be sent for the heartbeat
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
// Check for new tasks to be executed on the tasktracker
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
if (taskTrackerStatus == null) {
LOG.warn(“Unknown task tracker polling; ignoring: ” + trackerName);
} else {
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null) {
tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
}
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
if(LOG.isDebugEnabled()) {
LOG.debug(trackerName + ” -> LaunchTask: ” + task.getTaskID());
}
actions.add(new LaunchTaskAction(task));
}
}
}
}
此处会实例化一个 HeartbeatResponse 对象,作为本次心跳的返回值,在初始化一个 TaskTrackerAction 队列,用于存放 JT 对 TT 下达的指令。首先需要判断 recoveryManager 的 recoveredTrackers 是否为空,即是否有需要回复的 TT,然后根据 TT 心跳发送的 acceptNewTasks 值,即表明 TT 是否可接收新任务,并且该 TT 不在黑名单中,同上满足以上条件,则 JT 可以为 TT 分配任务。分配任务的选择方式是优先 CleanipTask,然后是 SetupTask,然后才是 Map/Reduce Task。下面来看下 getSetupAndCleanupTasks() 方法。
9.JobTracker.getSetupAndCleanupTasks():
// Don’t assign *any* new task in safemode
if (isInSafeMode()) {
return null;
}
如果集群处于 safe 模式,则不分配任务。
int maxMapTasks = taskTracker.getMaxMapSlots();
int maxReduceTasks = taskTracker.getMaxReduceSlots();
int numMaps = taskTracker.countOccupiedMapSlots();
int numReduces = taskTracker.countOccupiedReduceSlots();
int numTaskTrackers = getClusterStatus().getTaskTrackers();
int numUniqueHosts = getNumberOfUniqueHosts();
计算 TT 的最大 map/reduce slot,以及已占用的 map/reduce slot,以及集群可使用的 TT 数量,和集群的 host 数量。
for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
numUniqueHosts, true);
if (t != null) {
return Collections.singletonList(t);
}
}
上一篇浅析了 Hadoop 心跳机制的 TT(TaskTracker)方面(http://www.linuxidc.com/Linux/2014-06/103216.htm),这一篇浅析下 JT(JobTracker)方面。
我们知道心跳是 TT 通过 RPC 请求调用 JT 的 heartbeat() 方法的,TT 在调用 JT 的 heartbeat 回收集自身的状态信息封装到 TaskTrackerStatus 对象中,传递给 JT。下面看看 JT 如何处理来自 TT 的心跳。
目录 :
Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——TT 篇 http://www.linuxidc.com/Linux/2014-06/103216.htm
Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——JT 篇 http://www.linuxidc.com/Linux/2014-06/103217.htm
Hadoop1.2.1 源码解析系列:JT 与 TT 之间的心跳通信机制——命令篇 http://www.linuxidc.com/Linux/2014-06/103218.htm
1.JobTracker.heartbeat():
// Make sure heartbeat is from a tasktracker allowed by the jobtracker.
if (!acceptTaskTracker(status)) {
throw new DisallowedTaskTrackerException(status);
}
第一步是检查发送心跳请求的 TT 是否属于可允许的 TT,这个是根据一个 HostsFileReader 对象进行判断的,该对象是在实例化 JT 的时候创建的,这个类保存了两个队列,分别是 includes 和 excludes 队列,includes 表示可以访问的 host 列表,excludes 表示不可访问的 host 列表,这两个列表的内容根据两个 mapred.hosts 和 mapred.hosts.exclude(mapred-site,xml 中,默认是 null)这两个参数指定的文件名读取的。具体可参考 JT 源码 1956 行。
2.JobTracker.heartbeat():
String trackerName = status.getTrackerName();
long now = clock.getTime();
if (restarted) {
faultyTrackers.markTrackerHealthy(status.getHost());
} else {
faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
}
第一步是检查发送心跳请求的 TT 是否属于可允许的 TT,这个是根据一个 HostsFileReader 对象进行判断的,该对象是在实例化 JT 的时候创建的,这个类保存了两个队列,分别是 includes 和 excludes 队列,includes 表示可以访问的 host 列表,excludes 表示不可访问的 host 列表,这两个列表的内容根据两个 mapred.hosts 和 mapred.hosts.exclude(mapred-site,xml 中,默认是 null)这两个参数指定的文件名读取的。具体可参考 JT 源码 1956 行。
2.JobTracker.heartbeat():
String trackerName = status.getTrackerName();
long now = clock.getTime();
if (restarted) {
faultyTrackers.markTrackerHealthy(status.getHost());
} else {
faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
}
这一步是检查 TT 是否重启,是重启的话标识该 TT 的状态为健康的,否则检查 TT 的健康状态。faultyTrackers.markTrackerHealthy(status.getHost()) 内部将该 TT 所在的 Host 上所有的 TT(从这里可以看出 hadoop 考虑到一个 Host 上可能存在多个 TT 的可能)从黑名单,灰名单和可能存在错误的列表上删除,也就是从 potentiallyFaultyTrackers 队列中移除该 Host,通过更新 JT 的 numGraylistedTrackers/numBlacklistedTrackers 数量以及 JT 的 totalMapTaskCapacity 和 totalReduceTaskCapacity 数量。至于如何检查 TT 健康状态,具体是根据 JT 上记录的关于 TT 执行任务失败的次数来判断的(具体不是太理解)。
————————————– 分割线 ————————————–
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
————————————– 分割线 ————————————–
3.JobTracker.heartbeat():
HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);
boolean addRestartInfo = false;
if (initialContact != true) {
// If this isn’t the ‘initial contact’ from the tasktracker,
// there is something seriously wrong if the JobTracker has
// no record of the ‘previous heartbeat’; if so, ask the
// tasktracker to re-initialize itself.
if (prevHeartbeatResponse == null) {
// This is the first heartbeat from the old tracker to the newly
// started JobTracker
if (hasRestarted()) {
addRestartInfo = true;
// inform the recovery manager about this tracker joining back
recoveryManager.unMarkTracker(trackerName);
} else {
// Jobtracker might have restarted but no recovery is needed
// otherwise this code should not be reached
LOG.warn(“Serious problem, cannot find record of ‘previous’ ” +
“heartbeat for ‘” + trackerName +
“‘; reinitializing the tasktracker”);
return new HeartbeatResponse(responseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}
} else {
// It is completely safe to not process a ‘duplicate’ heartbeat from a
// {@link TaskTracker} since it resends the heartbeat when rpcs are
// lost see {@link TaskTracker.transmitHeartbeat()};
// acknowledge it by re-sending the previous response to let the
// {@link TaskTracker} go forward.
if (prevHeartbeatResponse.getResponseId() != responseId) {
LOG.info(“Ignoring ‘duplicate’ heartbeat from ‘” +
trackerName + “‘; resending the previous ‘lost’ response”);
return prevHeartbeatResponse;
}
}
}
此处第一句从 JT 记录的 HeartbeatResponse 队列中获取该 TT 的 HeartbeatResponse 信息,即判断 JT 之前是否收到过该 TT 的心跳请求。如果 initialContact!=true,表示 TT 不是首次连接 JT,同时如果 prevHeartbeatResponse==null,根据注释可以知道如果 TT 不是首次连接 JT,而且 JT 中并没有该 TT 之前的心跳请求信息,表明 This is the first heartbeat from the old tracker to the newly started JobTracker。判断 hasRestarted 是否为 true,hasRestarted 是在 JT 初始化(initialize() 方法)时,根据 recoveryManager 的 shouldRecover 来决定的,hasRestarted=shouldRecover,所以当需要进行 job 恢复时,addRestartInfo 会被设置为 true,即需要 TT 进行 job 恢复操作,同时从 recoveryManager 的 recoveredTrackers 队列中移除该 TT。如果不需要进行任务恢复,则直接返回 HeartbeatResponse,并对 TT 下重新初始化指令(后期介绍),注意此处返回的 responseId 还是原来的 responseId,即 responseId 不变。上面说的都是 prevHeartbeatResponse==null 时的情况,下面说说 prevHeartbeatResponse!=null 时如何处理,当 prevHeartbeatResponse!=null 时会直接返回 prevHeartbeatResponse,而忽略本次心跳请求。
更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-06/103217p2.htm
首先获取 Job 的 Cleanup 任务,每个 Job 有两个 Cleanup 任务,分别是 map 和 reduce 的。
for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
t = job.obtainTaskCleanupTask(taskTracker, true);
if (t != null) {
return Collections.singletonList(t);
}
}
然后获取一个 Cleanup 任务的 TaskAttempt。
for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
numUniqueHosts, true);
if (t != null) {
return Collections.singletonList(t);
}
}
然后在获取 Job 的 setup 任务。上面这三个全部是获取的 map 任务,而下面是获取 reduce 任务,方法基本一样。
如果该方法返回 null,则表示没有 cleanup 或者 setup 任务需要执行,则执行 map/reduce 任务。
10.JobTracker.heartbeat():
if (tasks == null) {
tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
}
此处是使用 TaskScheduler 调度任务,一大难点,后期分析。
11.JobTracker.heartbeat():
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
if(LOG.isDebugEnabled()) {
LOG.debug(trackerName + ” -> LaunchTask: ” + task.getTaskID());
}
actions.add(new LaunchTaskAction(task));
}
}
生成一个 LaunchTaskAction 指令。
// Check for tasks to be killed
List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
if (killTasksList != null) {
actions.addAll(killTasksList);
}
// Check for jobs to be killed/cleanedup
List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
if (killJobsList != null) {
actions.addAll(killJobsList);
}
// Check for tasks whose outputs can be saved
List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
if (commitTasksList != null) {
actions.addAll(commitTasksList);
}
以上分别是下达 kill task 指令,kill/cleanedup job 指令,commit task 指令。以上四种指令,加上一个 ReinitTackerAction,这是心跳 JT 对 TT 下达的所有五种指令,以后可以相信对其进行分析。
12.JobTracker.heartbeat():
// calculate next heartbeat interval and put in heartbeat response
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));
// check if the restart info is req
if (addRestartInfo) {
response.setRecoveredJobs(recoveryManager.getJobsToRecover());
}
// Update the trackerToHeartbeatResponseMap
trackerToHeartbeatResponseMap.put(trackerName, response);
// Done processing the hearbeat, now remove ‘marked’ tasks
removeMarkedTasks(trackerName);
剩下一些收尾工作,如计算下次发送心跳的时间,以及设置需要 TT 进行恢复的任务,更新 trackerToHeartbeatResponseMap 队列,移除标记的 task。最后返回 HeartbeatResponse 对象,完成心跳请求响应。
到此 JT 的 heartbeat() 完成了,中间很多地方比较复杂,都没有去深追,以后有时间可以继续研究,如有错误,请不吝指教,谢谢
更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13