`
cloudeagle_bupt
  • 浏览: 534124 次
文章分类
社区版块
存档分类
最新评论

YARN ResourceManager failover机制

 
阅读更多

RM(ResourceManager)每次在启动之前都会进行初始化并执行一次recovery操作,然后才启动RM,对外提供服务。

RM启动流程如下图:

RM启动流程图

RM中的各种服务包括:

(1)、ContainerAllocationExpirer:监控Containter是否到期。

(2)、AmLivelinessMonitor:监控App的存活状态。

(3)、NodesListManager:node列表管理,可以动态往集群中添加节点或者减少节点。

(4)、NMLivelinessMonitor:监控各个NodeManager是否存活,默认情况下,如果某个NodeManage在10min内未汇报心跳,则认为该节点出现故障。

(5)、ResourceTrackerService:实现了ResourceTracker协议,主要负责管理各个

NodeManager,如新NodeManager注册,死NodeManager的剔除。

(6)、ApplicationMasterService:实现了AMRMProtocol通信协议,负责与

ApplicationMaster交互,接收来自ApplicationMaster的请求并作出相应。

(7)、ClientRMService:实现ClientRMProtocal协议,负责与client交互,接收来自client

端的请求并作出响应。

(8)、AdminService:实现RMAdminProtocol协议,主要负责整个系统权限管理,如哪些client可以修改系统中队列名称,给某些队列增加资源等。

(9)、ApplicationMasterLauncher:管理ApplicationMaster的启动和退出。

一、RECOVERY的目标

l App恢复:未执行完成的App在RM重启之后重新执行

App恢复的三种策略:

<1>、整个作业重新计算

<2>、保存已经完成的map task和reduce task,只重新计算未完成的task

<3>、保存task的进度,从task断点处开始计算,如:某个task完成了20%,则AM重启后,让该task从20%处开始计算。

(第三种方案基本不可能实现,因为作业执行时,有时会保存几个全局变量,如全局counter,自定义的变量,这些东西由用户的程序控制,框架很难获取到他们的值并持久化到磁盘上以便恢复。)

l 资源恢复:重新注册集群中的节点(NodeManager)信息到RM

二、YARN关于RM恢复已做的工作

1、RM的状态信息

YARN中RM的状态信息包括两种:

l 一种是RMNode,集群中节点(NodeManager)的状态信息

l 一种是ApplicationInfo,提交的应用状态信息

2、RM状态存储方式

RM状态信息目前YARN只提供了两种存储方式:

l 一种是基于内存的方式存储(MemStore)

l 一种是基于ZK的方式存储(ZKStore)

具体选择那种存储方式可以用以下参数配置:

yarn.resourcemanager.store.class:配置RM状态信息存储方式,有MemStore和ZKStore。

yarn.resourcemanager.zookeeper-store.address:当使用ZK存储时,指定在ZK上的存储地址。

3、RM的恢复[不可用]

YARN目前鉴于是内测版,RM恢复部分实现了部分代码,总体是不可用的,只供参考,部分关键代码如下。

<1>、RMNode的恢复:

重新将集群的节点信息注册到RM。

ResourceTrackerService中的代码如下:

public void recover(RMState state) {

List<RMNode> nodeManagers = state.getStoredNodeManagers();

for (RMNode nm : nodeManagers) {

createNewNode(nm.getNodeID(), nm.getNodeHostName(), nm

.getCommandPort(), nm.getHttpPort(), nm.getNode(), nm

.getTotalCapability());

}

for (Map.Entry<ApplicationId, ApplicationInfo> entry : state

.getStoredApplications().entrySet()) {

List<Container> containers = entry.getValue().getContainers();

List<Container> containersToAdd = new ArrayList<Container>();

for (Container c : containers) {

RMNode containerNode = this.rmContext.getNodesCollection()

.getNodeInfo(c.getNodeId());

containersToAdd.add(c);

containerNode.allocateContainer(entry.getKey(), containersToAdd);

containersToAdd.clear();

}

}

}

<2>、ApplicationInfo的恢复:

调用资源调度算法(CapacityScheduler、FairScheduler、FifoScheduler)重新分配资源(Container)并重新执行未执行完成的App。

资源调度算法采用以下参数进行配置:yarn.resourcemanager.scheduler.class,默认采用:org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler容量调度算法。

CapacityScheduler中代码如下:

@Lock(Lock.NoLock.class)

public void recover(RMState state) throws Exception {

applications.clear();

for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet()) {

ApplicationId appId = entry.getKey();

ApplicationInfo appInfo = entry.getValue();

SchedulerApp app = applications.get(appId);

app.allocate(appInfo.getContainers());

for (Container c: entry.getValue().getContainers()) {

Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue());

queue.recoverContainer(clusterResource, applications.get(appId), c);

}

}

}

三、YARN关于RM恢复没做的工作

<1>、RM状态信息何时存储

<2>、RM失败后根据状态进行恢复

<3>、RM HA

四、hadoop YARN 社区上的讨论

目前在hadoop YARN jira上针对RM recovery的讨论,主要集中在以下几方面:

第一点:RM恢复需要那些状态信息

一种观点是只需要存储还没有执行完成的App的信息。

一种观点是既要存储App的信息,还要存储NM的信息,如当前YARN部分实现的那样。

同时,YARN目前实现的状态信息也有人任务存在问题:一是信息不足,不足以进行恢复;而是存在冗余信息,这些信息无需持久化。

第二点:RM状态信息何时进行存储

这部分目前代码中完全没有相关实现。

第三点:RM状态信息的存储方式

有三种存储方式可以选择:

MemStore:内存存储。状态是需要持久化的,内存存储不太可能会采用。

DiskStore:磁盘存储。这部分在hadoop 2.0.2中还没有相关代码,但估计在目前开发的版本中有。

ZKStore:ZooKeeper存储

第四点:RM失败后如何进行恢复

这和第一点相关。

五、目前社区提出的recovery具体思路和问题

https://issues.apache.org/jira/browse/YARN-128

Tsuyoshi OZAWA added a comment - 27/Jul/12 10:42

Yeah, it’s not trivial what to save into ZK or the local disk of RM.
I’m going to look at the code too, and post them here.

Tsuyoshi OZAWA added a comment - 29/Jul/12 10:14

I’ve looked around the code of RM, and I’ve found that the current Recoverable interface provides storing the states as follows:
1. information about application(application ids and info defined in ApplicationId.java and ApplicationSubmissionContext.java).
2. Information about node managers(info about Node Manager defined in RMNode.java).

当前的Recoverable接口提供了两种状态的存储:应用信息和NM信息。

My questions are:
1. Are the states enough to store? In my looking around the code, RMContext has the other states, however, the states are recoverable without the store.
2. When the states should be saved onto the store?
3. When the interface getLastLoggedNodeId() is used?

  1. 这些状态足够?RMContext有其他状态,这些状态无需store也可恢复。
  2. 这些状态何时被保存到store中?
  3. 接口getLastLoggedNodeId()何时使用?

IMHO, we should go step by step as follows:
1. Define the states of RM, which are preserved onto MemStore/DiskStore/ZKStore.
2. Implement the resurrectable version when the RM crashed(ex. DiskStore/ZKStore).
Prototyping 2 and testing it will prove the correctness of 1.

  1. 定义RM状态,通过MemStore/DiskStore/ZKStore进行保存。
  2. 实现在宕机后可恢复的RM版本。

If you have any ideas, please let me know.

Bikas Saha added a comment - 06/Aug/12 07:48

I think the current implementation (actual code/commented code/todo’s etc) looks like a prototype which may not be in sync with the current state of the functional code. So I am not sure about using it as is.

当前的实现像一个快照,可能并没有和当前的功能代码同步。

Also, the implementation seems to be doing blocking calls to ZK etc and will likely end up being a bottleneck on RM threads/perf if a lot of state information needs to be synced to stable store.

实现通过调用ZK等,如果需要同步到持久化存储上的状态信息很多,很可能造成RM线程/perf的瓶颈。

On that note, my gut feeling is that the RM state in practice is, in a sense, the sum total of the current state of the cluster as reflected in the NM’s. So there may not be the need to store any state as long as the RM can recover the current state of the cluster from the NM’s in a reasonable amount of time. The NM’s anyways have to re-sync with the RM after it comes back up. So that is not extra overhead.

集群的所有当前状态可以被NM节点的状态反映。在合理的时间内,RM可以从NM上恢复集群当前状态。NM可用后无论如何都会同RM重新同步。

Saving a lot of state would result in having to solve the same set of issues that the Namenode has to solve in order to maintain consistent, reliable and available saved state. IMO, for the RM we are better off avoiding those issues.
The only state that needs to be save, as far as I can see, is the information about all jobs that are not yet completed. This information is present only in the RM and so needs to be preserved across RM restart. Fortunately, this information is small and infrequently updated. So saving it synchronously in ZK may not be too much of an issue.

只有未完成的job信息需要作为RM状态被存储。这些信息只在RM上,需要在RM重启时恢复。这些信息量很少而且更新的频率不高。通过ZK同步开销不大。

Tsuyoshi OZAWA added a comment - 08/Aug/12 09:03

> So there may not be the need to store any state as long as the RM can recover the current state of the cluster from the NM’s in a reasonable amount of time.

It’s good idea to avoid saving recoverable states without storing. It’s uncertain that it can be recoverable in a reasonable amount of time, so prototyping is needed.

不确定是否能在合理的时间内恢复,所以还需要快照。

> The only state that needs to be save, as far as I can see, is the information about all jobs that are not yet completed.

I agree with you. I’ll check whether the states of WIP jobs is defined correctly or not.

> Also, the implementation seems to be doing blocking calls to ZK etc and will likely end up being a bottleneck on RM threads/perf if a lot of state information needs to be synced to stable store.

I think, to avoid being the bottleneck, RM should have a dedicated thread to save the states of RM. The main thread can send the requests of saving the states to the dedicated thread without blocking by using queue or something. Using async APIs to save the states is also effective, however, the code can get complicated.

为了避免瓶颈,RM需要一个专门的线程保存RM的状态。main线程可以发送保存状态的请求给这个专门的线程,而因为使用队列或者其他数据结构而阻塞。使用异步API保存状态也可以保证效率,但代码会比较复杂。

Vinod Kumar Vavilapalli added a comment - 24/Sep/12 20:39

Pasting notes from Bikas inline for easier discussion.

Basic Idea:基本思路

Key idea is that the state of the cluster is its current state. So don’t save all container info.
RM on startup sets a recovery flag on. Informs scheduler via API.

集群状态就是集群当前的状态,无需保存所有容器的信息。RM启动时设置一个恢复标志。

Re-create running AM info from persisted state. Running AM’s will heartbeat to the RM and be asked to re-sync.

从持久化的状态中重新创建运行的AM信息。运行的AM通过心跳联系RM,同时被要求重新同步。

Re-start AM’s that have been lost. What about AM’s that completed during restart. Re-running them should be a no-op.

重新启动失去联系的AM。在重启期间完成的AM,使用空操作重跑他们。

Ask running and re-started AM’s to re-send all pending container requests to re-create pending request state.

要求所有正在运行和重新启动的AM重新发送所有的等待容器请求以重建等待请求状态。

RM accepts new AM registrations and their requests.

RM接收新AM注册和请求。

Scheduling pass is not performed when recovery flag is on.

在恢复标志被设置期间不执行调度操作。

RM waits for nodes to heartbeat and give it container info.

RM等待节点的心跳信息,收到后发送容器信息。

RM passes container info to scheduler so that the scheduler can re-create current allocation state.

RM将容器信息发给调度器,调度器以之重建当前的资源分配状态。

After recovery time threshold, reset recovery flag and start the scheduling pass. Normal from thereon.

恢复时间限制到达后,重置恢复标志,启动调度。

Schedulers could save their state and recover previous allocation information from that saved state.

调度器保存其状态,从保存的状态中恢复之前的分配信息。

What info comes in node heartbeats:节点心跳带来的信息

Handle sequence number mismatch during recovery. On heartbeat from node send ReRegister command instead of Reboot. NodeManager should continue running containers during this time.

在恢复过程中,handle序列号将处于不对应的状态。节点通过心跳发送重新注册的命令而不是重启。在此期间NM继续执行容器相关操作。

RM sends commands back to clean up containers/applications. Can orphans be left behind on nodes after RM restart? Will NM be able to auto-clean containers?

RM向NM发送命令以清除容器/应用。在RM重启后节点上是否可能存在剩余的孤立容器?NM是否可以自动清除容器?

ApplicationAttemptId can be gotten from Container objects to map resources back to SchedulingApp.

ApplicationAttemptId可以从容器对象到被映射的资源处被获取,然后返回给SchedulingApp。

How to pause scheduling pass:如何暂停调度过程

Scheduling pass is triggered on NODE_UPDATE events that happen on node heartbeat. Easy to pause under recovery flag.

调度过程在节点心跳时被节点更新事件触发。很容易被恢复标志所暂停。

YarnScheduler.allocate() is the API that needs to be changed.

API中的YarnScheduler.allocate()应该被修改了。

How to handle container releases messages that were lost when RM was down? Will AM’s get delivery failure and continue to resend indefinitely?

如何处理在RM宕机时丢失的容器释放消息?AM将能获取发送的失败信息然后继续无限地重新发送直到成功?

How to re-create scheduler allocation state:如何重建调度器分配状态

On node re-register, RM passes container info to scheduler so that the scheduler can re-create current allocation state.

在节点重新注册期间,RM将容器信息传给调度器,调度器可以以之重建当先分配状态。

Use CsQueue.recoverContainer() to recover previous allocations from currently running containers.

使用CsQueue.recoverContainer()从当前运行的容器中来恢复之前的分配信息。

How to re-synchronize pending requests with AM’s:如何与AM重新同步等待请求

Need new AM-RM API to resend asks from AM to RM.

需要新的AM-RM API来重新发送AM到RM的请求。

Keep accumulating asks from AM’s like it currently happens when allocate() is called.

保持从AM的累计请求,就像请求在allocate()被调用的同时发生的一样。

How to persist AM state:如何持久化AM状态

Store AM info in a persistent ZK node that uses version numbers to prevent out of order updates from other RM’s. One ZK node per AM under a master RM ZK node. AM submission creates ZK node. Start and restart update ZK node. Completion clears ZK node.

保存AM信息到一个持久化的ZK节点,使用版本号防止更新因其他RM的更新而失序。一个ZK节点对应一个AM,这些节点都在一个主RM ZK节点下。AM的提交导致ZK节点的创建。AM的启动和重启更新ZK节点。完成清除ZK节点。

Metrics:指标

What needs to be done to maintain consistency across restarts. New app attempt would be a new attempt but what about recovered running apps.

为了保持重启期间的一致性需要做什么?新应用尝试将会是一个新尝试,但是恢复运行的应用如何处理?

Security:安全

What information about keys and tokens to persist across restart so that existing secure containers continue to run with new RM and new containers. ZK nodes themelves should be secure.

哪些key和token信息需要在重启期间持久化使得已存在的安全容器可以和新RM和新容易一起继续运行?ZK节点的安全性需要自己保证。

Vinod Kumar Vavilapalli added a comment - 24/Sep/12 21:17

+1 for most of your points. Some specific comments:

What about AM’s that completed during restart. Re-running them should be a no-op.

AMs should not finish themselves while the RM is down or recovering. They should just spin.

AM不应在RM宕机或恢复期间完成,而是应等待。

How to handle container releases messages that were lost when RM was down? Will AM’s get delivery failure and continue to resend indefinitely?

You mean release requests from AM? Like above, if AMs just spin, we don’t have an issue.

Need new AM-RM API to resend asks from AM to RM.

See AMResponse.getRebott(). That can be used to inform AMs to resend all details.

What information about keys and tokens to persist across restart so that existing secure containers continue to run with new RM and new containers.

We already noted this as java comments in code. Need to put in proper documentation.

ZK nodes themelves should be secure.

Good point. Worst case that ZK doesn’t support security, we can rely on a RM specific ZK instance and firewall rules.

More requirements:

  • An upper bound (time) on recovery?

恢复时间的上限。

  • Writing to ZK shouldn’t add more than x% (< 1-2%) to app latency?

向ZK写不应使应用增加超过1-2%的延迟。

More state to save:

  • New app submissions should be persisted/accepted but not acted upon during recovery.

在恢复期间新应用提交应被持久化/接受,但不能被执行。

Miscellaneous points:

  • I think we should add a new ServiceState call Recovering and use the same in RM.
  • Overall, clients, AMs and NMs should spin while the RM is down or doing recovery. Also we need to handle fail-over of RM, should do as part of a separate ticket.

客户端、AM和NM在RM宕机或恢复期间应挂起。

  • When is recovery officially finished? When all running AMs sync up? I suppose so, that would be an upper bound equaling AM-expiry interval.

恢复在何时正式结束?在所有运行的AM同步完成的时候?如果这样需要一个等于AM到期时间的上限。

  • Need to think of how the RM-NM shared secret roll-over is affected, if RM is down for a significant amount of item

需要考虑如果RM因为NM数量过多而宕机,RM和NM直接如何有效、保密地roll-over。

Robert Joseph Evans added a comment - 24/Sep/12 22:10

AMs should not finish themselves while the RM is down or recovering. They should just spin.

+1 for that. If we let the MR AM finish, and then the RM comes up and tries to restart it will get confused because it will not find the job history log where it expects to see it which will cause it to restart, and it is likely to find the output directory already populated with data, which could cause the job to fail. What is worse it may not fail, because I think the output committer will ignore those errors. The first AM could inform oozie that the job finished through a callback, and a second job may be launched and is reading the data at the time that the restarted first job is trying to write that data, which could cause inconsistent results or cause the second job to fail somewhat randomly.

如果使MR AM完成,RM重启时会有问题,因为其无法在期望位置找到相应job历史日志,这些日志记录了RM需要重启的原因,RM可能会发现输出目录已经有数据了,这将导致job’失败。更糟糕的是job可能不会失败,因为输出提交者会忽略这些错误。第一个AM会通过回调通知oozie job完成,第二个job可能会启动并读取数据,而此时重启的第一个job正在写这些数据,这将导致不一致的结果或第二个job因为随机原因而失败。

An upper bound (time) on recovery?

This is a bit difficult to determine because the RM is responsible for renewing tokens. Right now it will renew them when they only have about 10% of their time left before they expire. So it depends on how long the shortest token you have in flight is valid for before it needs to be renewed. In general all of the tokens I have seen are for 24 hours, so you would have about 2.4 hours to bring the RM back up and read in/start renewing all of the tokens or risk tokens expiring.

这个时间难以决定,因为RM需要处理token更新。目前RM将在token过期时间剩余10%的时候更新token。所以恢复的上限时间取决于最短的token有效时间。通常token的过期时间为24小时,所以需要2.4小时使RM后备并读取和开始更新所有过期的token,否则就有token过期的风险。

Thomas Graves added a comment - 25/Sep/12 02:03

RM sends commands back to clean up containers/applications. Can orphans be left behind on nodes after RM restart? Will NM be able to auto-clean containers?

Containers can currently be lost. See YARN-72 and YARN-73. Once its changed so RM doesn’t always reboot the NM’s that will get a bit better but its still possible so we will have to handle somehow. Since the NM could crash it almost needs a way to check on startup whats running and at that point decide if it should clean them up. It does have a .pid file for the containers but you would have to be sure that process is the same one as when the NM went down.

容器一般来说会丢失。容量改变后RM不会一直重启NM,这样比较好,但是可能依然需要我们来处理。因为NM可能宕机,所以其需要一种方法在启动时检查宕机时正在运行的应用,并决定是否需要清理它们。NM有一个关于容器的pid文件,但是仍然需要保证这个进程和NM宕机时的进程一致。

Thomas Graves added a comment - 25/Sep/12 03:40

What about AM’s that completed during restart. Re-running them should be a no-op.

AMs should not finish themselves while the RM is down or recovering. They should just spin.

Doesn’t the RM still need to handle this. The client could stop the AM at any point by talking directly to it. Or since anyone can write an AM it could simply finish on its own. Or perhaps timing issue on app finish. How does the RM tell the difference? We can have the MR client/AM handle this nicely but even then there could be a bug or expiry after so long. So perhaps if the AM is down it doesn’t get restarted? Thats probably not ideal if app happens to go down at the same time as the RM though – like a rack gets rebooted or something, but otherwise you have to handle all the restart issues, like Bobby mentioned above.

RM无需处理这些。客户端可以通过同RM直接交换而在任意点停止AM。或者任何将写一个AM的对象都可以自己来完成。或者可能是应用完成的时间选择的问题。RM如何分辨其中的区别?我们可以使MR客户端/AM来很好地处理这些问题,但是即使如此依然可能存在bug或者到期时间过长。或许AM宕机后不重启?如果应用刚好在RM宕机或重启时宕机,这可能不是一个理想的解决方案,但是不这样做就必须处理所有重启的问题。

Robert Joseph Evans added a comment - 25/Sep/12 16:06

The problem is that we cannot be truly backwards compatible when adding in this feature. We have to better define the lifecycle of an AM for it to be “well behaved” and properly handle RM recovery. I would say that if the client asks the AM to stop it should still pause on unregister until it can successfully unregister, or until it can mark itself as “killed” in a persistent way like with the job history log, so that when that AM is relaunched all it has to do is to check a file on HDFS and then unregister. Perhaps the only way to be totally backwards compatible is for the AM to indicate when it registers if it supports RM recovery or not. Or to avoid any race conditions when the client launches the AM it would indicate this. If it does not (legacy AMs), then the RM will not try to relaunch it if the AM goes down while the RM is recovering. If it does, then the AM will always be relaunched when the RM goes down.

问题是我们不能保证添加这个特性后能够保证向后 兼容。我们需要好好定义AM的生存周期,其应该具有良好的行为并且可以在RM恢复期间被适当地处理。如果客户端要求AM停止,其将在注销时暂停直到注销完成,或者可以在一个持久化方式中(比如job历史日志)标识自身为“killed”,这样AM重加载时所需做的就是坚持HDFS上的文件然后注销。或许唯一保证向后兼容的方法是AM在注册时需要指明其是否支持RM恢复。避免客户端在加载AM存在任何的竞争条件。如果不这么做,那么AM宕机时RM正在恢复,RM将不会尝试重新加载AM。反之,RM宕机后AM将会被重新加载。

Devaraj K added a comment - 18/Oct/12 07:54

Attaching the first version of patch. I have tested in a small cluster with FIFO & Capacity schedulers by making RM down and up while running the application and continued the application without any failures.

Arun C Murthy added a comment - 06/Nov/12 01:34

Agree with Bobby’s concerns.

For now I think the first step should be to merely restart all apps on RM restart, something similar to MR1 today.

RM重启的第一步应该是仅仅重启所有应用,如同MR1。

Bikas – can I pls suggest this as a first step? Thanks!

Bikas Saha added a comment - 06/Nov/12 17:28

yeah. I have been thinking on similar lines too. Working on a refreshed proposal and code patch.

Bikas Saha added a comment - 06/Nov/12 17:33

Devaraj, I think the current approach+code based on zkstore (that YARN-128.patch builds on top of) has some significant issues wrt perf/scalability of ZK/future HA. The design outline attached to this jira calls out some of the issues. The next proposal document will help clarify a bit more I hope.

Bikas Saha added a comment - 09/Nov/12 18:46

Attaching a proposal doc and code for the first iteration. The proposal is in the same lines as the earlier initial design sketch but limits the first iteration of the work to restarting the applications after the RM comes back up. The reasoning and ideas are detailed in the doc.

Attaching some code that implements the proposal. It includes a functional test that verifies the end-to-end scenario using an in-memory store. If everything looks good overral then I will tie up the loose ends and add more tests.

For review, the code is broken into 1) removal of old code 2) new code + test. There are TODO comments in the code where folks could make suggestions. The code is attached in full for a build and test pass on Jenkins because my machine is having long host resolution timeouts. Any ideas on this?

During the testing I found a bug in the CapacityScheduler because of which it fails to activate applications when resources are added to the cluster. Folks can comment on the fix. There is a separate test case that shows the bug and verifies the fix.

Bikas Saha added a comment - 11/Nov/12 14:47

Updating patches for new code and combined patch.
Changes
1) Code added to remove application data upon completion
2) All TODO’s examined and removed/fixed.
3) Improved TestRMRestart and its readability
4) Added more tests for RMAppAttemptTransitions
5) Refactored RMStateStore into an abstract class so that it can implement common functionality to notify app attempt about async store operation completion

Fix for capacity scheduler bug is still in the patch because it blocks test completion. The issue is also tracked in YARN-209

Bikas Saha added a comment - 12/Nov/12 05:29

Attaching rebased patches

Bikas Saha added a comment - 13/Nov/12 12:03

Attaching rebased patches + change RMStateStore to throw exception to notify about store errors.

Arinto Murdopo added a comment - 15/Nov/12 08:35

Based on the YARN-128.full-code-4.patch, I have these following observations:

1) In TestRMRestart.java Line 78, app1 and appState refer to the same instance because we are using memory to store the states (MemoryRMStateStore). Therefore, the assert result will always be True.

2) ApplicationState is stored when we invoke MockRM’s submitApp method. More precisely, it is in ClientRMService class, line 266. The state that we store contains the resource request from client. In this case, the value of resource request is 200. However, if we wait for some time, the value will be updated to 1024 (which is the normalized value given by the Scheduler).

3)Currently our school project is trying to persist the state in persistent storage, and the assert statement in our modified test class returns error since our storage stores the resource value before updated by the scheduler.

Based on above observations, should we update the persisted memory value with the new value assigned by scheduler?
Since we are going to restart both ApplicationMaster and NodeManager when there is failure in ResourceManager, I think the answer is no, we can use the original value requested by user. But I’m not really sure with my own reasoning.. soo.. please comment on it. . If the answer is yes, then we should wait until Scheduler updates the resource value before persisting it into the storage.

Bikas Saha added a comment - 16/Nov/12 11:03

1) Unless I am mistaken, the test condition is correct. app1 is the app actually submitted while appState is the state retrieved from the store. By checking that both are the same, we are checking that the data that was supposed to be passed has actually been passed to the store and there is no bug in the transfer of that data. The assert will be false if the transfer does not happen or some other value gets passed by mistake. Does that help clarify?

3) Which resource value is this? The one that is store in ApplicationSubmissionContext->ContainerLaunchContext? In the patch, the ApplicationSubmissionContext is being store at the very beginning to ensure that the client does not have to submit the job again. Hence, the Resource set by the client is saved. I am not sure what your project is saving after the scheduling is done.
You are right. We dont want to store the updated value since this updated value is a side-effect of the policy of the scheduler.

I am not sure if this applies to your project. I will be shortly posting an Zookeeper and HDFS state store that you could use unless you are using your own storage mechanism.

Arinto Murdopo added a comment - 16/Nov/12 13:21

1) Yes, I agree with your clarification. It works as what you state when we are using persistent storage (not MemStore, but ZK, MySQL, file or other persistent storage)
However, when we are using MemStore, the stored object (appState) and app1 are referring to the same instance since our “store” is memory. To test my argument, we can put breakpoint in the assert statement that compares the ApplicationSubmissionContext, then use IDE feature to change any value of appState’s properties i.e resource in ApplicationSubmissionContext. The corresponding app1 value (in this case is the resource in app1′s ApplicationSubmissionContext) will also be updated to the same value.

3). Yes, it’s in Resource in ApplicationSubmissionContext->ContainerLaunchContext. e
If we are saving the original resource value requested by client, then the assert statement that compare ApplicationSubmissionContext will not pass.
Let’s say Client request resource of memory with value of 200. We store this in our persistent storage. After we store, scheduler updates the resource with value of 1024. In this case, the resource in app1 instance will be 1024, but the resource that stored in our storage is 200. Hence, it will not pass when we compare them using current assert statement. Maybe we need to keep storing our original resource request in ApplicationSubmissionContext.

Looking forward to your ZK and HDFS state store. The state store in our project is using MySQL cluster.

Tom White added a comment - 16/Nov/12 14:02

Bikas, this looks good so far. Thanks for working on it. A few comments:

  • Is there a race condition in ResourceManager#recover where RMAppImpl#recover is called after the StartAppAttemptTransition from resubmitting the app? The problem would be that the earlier app attempts (from before the resart) would not be the first ones since the new attempt would get in first.
  • I think we need the concept of a ‘killed’ app attempt (when the system is at fault, not the app) as well as a ‘failed’ attempt, like we have in MR task attempts. Without the distinction a restart will count against the user’s app attempts (default 1 retry) which is undesirable.
  • Rather than change the ResourceManager constructor, you could read the recoveryEnabled flag from the configuration.

Bikas Saha added a comment - 16/Nov/12 15:56

@Arinto
Thanks for using the code!
1) Yes. Both are the same object. But that is what the test is testing. That the context that got saved in the store is the same as the one the app was submitted with. We are doing this with an in memory store that lets us examine the stored data and compare it with the real data. A real store would save this the data. So comparison is not possible.
3) Yes. It seems incorrect to store scheduler side-effects. e.g. upon restart if the scheduler config make minimum container size = 512 then again it will not match.
I am attaching a patch for a ZK store that you can try. It applies on top of the current full patch.

@Tom
Thanks for reviewing!
1) There is no race condition because the Dispatcher has not been started yet and hence the attempt start event has not been processed. There is a comment to that effect in the code.
2) I agree. I had thought about it too. But it looks like the current behavior (before this patch) does this because it does not differentiate killed/failed attempts when deciding that the attempt retry limit has been reached. So I thought about leaving it for a separate jira which would be unrelated to this. Once that is done this code could use it and not count the restarted attempt. This patch is already huge. Does that sound good?
3) Yes. That could be done. The constructor makes it easier to write tests without mangling configs.

Tom White added a comment - 16/Nov/12 16:14

You are right about there being no race – I missed the comment! I opened YARN-218 for the killed/failed distinction as I agree it can be tackled separately.

Bikas Saha added a comment - 17/Nov/12 17:39

Updated ZK and FileSystem store patches. FileSystem patch applies after ZK patch.

Tom White added a comment - 19/Nov/12 17:48

I had a quick look at the new patches and FileSystemRMStateStore and ZKRMStateStore seem to be missing default constructors, which StoreFactory needs. You might change the tests to use StoreFactory to construct the store instances to test this code path.

Bikas Saha added a comment - 19/Nov/12 20:31

Thanks for looking at the patches while work is still in progress. That helps a lot!
Yes. I am working on that currently. The 2 also have a lot of duplicated code which I am moving into the base class. I will soon create a few sub tasks and post the final patches in them so that its easier to review and commit them.

Bikas Saha added a comment - 20/Nov/12 13:14

Attaching final patch with full changes for a test run. Can someone with access please trigger a test run on JIRA?
Changes
1) Completed handling on unmanaged AM’s
2) Refactored ZK and FileSystem store classes to move common logic into the base class and also integrate with the RM
3) Test improvements
I have tested manually on a single node with both ZK and FileSystem store (using HDFS) and run wordcount job across a restart.

I will create sub-tasks of this jira to break the changes into logical pieces.

Bikas Saha added a comment - 20/Nov/12 15:16

Done creating sub-tasks and attaching final patches for review and commit.

Arinto Murdopo added a comment - 28/Nov/12 16:19

Tested the YARN-128.full-code.5.patch, using ZooKeeper store and the result is positive. ResourceManager resurrected properly after we killed it.
Experiment overview:

  • ZK settings: 1 ZK-Server consisted of 3 different nodes
  • HDFS was in single-node setting. YARN and HDFS was executed in the same node.
  • Executed bbp and pi examples from the generated hadoop distribution (we built and packaged the trunk and patch code)
  • Killed ResourceManager process when bbp or pi was executing(using Linux kill command) and started new RM 3 seconds after we killed it.

Strahinja Lazetic added a comment - 04/Dec/12 11:08

Bikas, I have one question; Since we reboot NMs and terminate all the running containers and AMs upon the RM restart, why do we need to keep track of the previous Applications’ attempts? Couldn’t we just start “from scratch” instead of generating the next attempt id based on the last running one?

参考链接如下:

https://issues.apache.org/jira/browse/YARN-128

https://issues.apache.org/jira/browse/MAPREDUCE-4343

https://issues.apache.org/jira/secure/attachment/12549649/YARN-128.patch

https://issues.apache.org/jira/secure/attachment/12532336/MR-4343.1.patch

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics