`
cloudeagle_bupt
  • 浏览: 538149 次
文章分类
社区版块
存档分类
最新评论

Yarn2.2.0公平调度器分配流程

 
阅读更多
这里要把握好两条主线:


1. ApplicationMasterService
通过重载ApplicationMasterProtocol的AllocateResponse allocate(AllocateRequest request)方法实现了AppMaster的远程资源请求,同时
也是兼具心跳作用。
AllocateResponse allocate(AllocateRequest request){
...
// Send new requests to appAttempt.
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals); //ask是请求的资源,release是AM释放的资源...
...

}

以FairScheduler的 public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals)
为例,

public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals)
{
...
//释放资源
for (ContainerId releasedContainerId : release) {
...
}

// Update application requests
application.updateResourceRequests(ask); //获取
。。。

return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom(), preemptionContainerIds);
//这里application.pullNewlyAllocatedContainers()是因为在分配时RMContainer org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp.allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, Container container)就已经将新分配的容器写入FSSchedulerApp
的newlyAllocatedContainers对象了,这时候返回直接取就行,这样就把已经分配的容器信息返回给了AM,AM在据此去NM上启动相应
的Container
}
这是该方法
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
this.appSchedulingInfo.updateResourceRequests(requests);
}
一路跟进:
void org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo.updateResourceRequests(List<ResourceRequest> requests) //更新该应用的资源请求

updateResourceRequests(List<ResourceRequest> requests){

for (ResourceRequest request : requests) {
Priority priority = request.getPriority();
String resourceName = request.getResourceName(); //
...
Map<String, ResourceRequest> asks = this.requests.get(priority);
...
asks.put(resourceName, request);
...
}

}
该方法的作用就是更新该应用的不同的优先级的请求列表,
final Map<Priority, Map<String, ResourceRequest>> requests =
new HashMap<Priority, Map<String, ResourceRequest>>();

以上,就完成了这次AppMaster请求的大致内容,更新appSchedulingInfo的不同优先级的请求Map.



2. FairScheduler
就是分配,就沿着FairScheduler的public void handle(SchedulerEvent event) 中的case NODE_UPDATE:一路看下去就行,

Resource org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable.assignContainer(FSSchedulerNode node, boolean reserved)函数

assignContainer(FSSchedulerNode node, boolean reserved) {
...
for (Priority priority : prioritiesToTry) {
app.addSchedulingOpportunity(priority); //统计该应用不同优先级的被调度次数,每一次调度+1,若本次调度成功则清0

ResourceRequest rackLocalRequest = app.getResourceRequest(priority,node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,node.getNodeName()); //获取该应用的当前优先级下,这个节点的所有本地性请求

NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());// allowedLocality很重要,关系到后面的delay调度

if(rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, priority,
localRequest, NodeType.NODE_LOCAL, reserved);
}

....
}
}


//资源分配最重要的函数,如果能分配就分,不能就预留
private Resource assignContainer(FSSchedulerNode node,
Priority priority, ResourceRequest request, NodeType type,
boolean reserved) {


// How much does this request need?
Resource capability = request.getCapability();


// How much does the node have?
Resource available = node.getAvailableResource();


Container container = null;
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
container = createContainer(app, node, capability, priority);
}


// Can we allocate a container on this node?
if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
app.allocate(type, node, priority, request, container);
if (allocatedContainer == null) {
// Did the application need this resource?
if (reserved) {
unreserve(priority, node);
}
return Resources.none();
}


// If we had previously made a reservation, delete it
if (reserved) {
unreserve(priority, node);
}


// Inform the node
node.allocateContainer(app.getApplicationId(),
allocatedContainer);


return container.getResource();
} else {
// The desired container won't fit here, so reserve
reserve(priority, node, container, reserved);


return FairScheduler.CONTAINER_RESERVED;
}
}

NM context中的container记录了所有容器的信息

还有就是新版本的delay改成了次数,相比老版本的时间delay,个人觉得都差不多。

这样, RM一边通过FairScheduler分配资源,一边通过ApplicationMasterService返回分配的资源信息,完成了整套流程。


这里有个问题是RackLocal Request分配后,AM如何知道?





分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics