`
- 浏览:
538149 次
-
这里要把握好两条主线:
1. ApplicationMasterService
通过重载ApplicationMasterProtocol的AllocateResponse allocate(AllocateRequest request)方法实现了AppMaster的远程资源请求,同时
也是兼具心跳作用。
AllocateResponse allocate(AllocateRequest request){
...
// Send new requests to appAttempt.
Allocation allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals); //ask是请求的资源,release是AM释放的资源...
...
}
以FairScheduler的 public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals)
为例,
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals)
{
...
//释放资源
for (ContainerId releasedContainerId : release) {
...
}
// Update application requests
application.updateResourceRequests(ask); //获取
。。。
return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom(), preemptionContainerIds);
//这里application.pullNewlyAllocatedContainers()是因为在分配时RMContainer org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp.allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest
request, Container container)就已经将新分配的容器写入FSSchedulerApp
的newlyAllocatedContainers对象了,这时候返回直接取就行,这样就把已经分配的容器信息返回给了AM,AM在据此去NM上启动相应
的Container
}
这是该方法
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
this.appSchedulingInfo.updateResourceRequests(requests);
}
一路跟进:
void org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo.updateResourceRequests(List<ResourceRequest> requests) //更新该应用的资源请求
updateResourceRequests(List<ResourceRequest> requests){
for (ResourceRequest request : requests) {
Priority priority = request.getPriority();
String resourceName = request.getResourceName(); //
...
Map<String, ResourceRequest> asks = this.requests.get(priority);
...
asks.put(resourceName, request);
...
}
}
该方法的作用就是更新该应用的不同的优先级的请求列表,
final Map<Priority, Map<String, ResourceRequest>> requests =
new HashMap<Priority, Map<String, ResourceRequest>>();
以上,就完成了这次AppMaster请求的大致内容,更新appSchedulingInfo的不同优先级的请求Map.
2. FairScheduler
就是分配,就沿着FairScheduler的public void handle(SchedulerEvent event) 中的case NODE_UPDATE:一路看下去就行,
Resource org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable.assignContainer(FSSchedulerNode node, boolean reserved)函数
assignContainer(FSSchedulerNode node, boolean reserved) {
...
for (Priority priority : prioritiesToTry) {
app.addSchedulingOpportunity(priority); //统计该应用不同优先级的被调度次数,每一次调度+1,若本次调度成功则清0
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,node.getNodeName()); //获取该应用的当前优先级下,这个节点的所有本地性请求
NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());// allowedLocality很重要,关系到后面的delay调度
if(rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, priority,
localRequest, NodeType.NODE_LOCAL, reserved);
}
....
}
}
//资源分配最重要的函数,如果能分配就分,不能就预留
private Resource assignContainer(FSSchedulerNode node,
Priority priority, ResourceRequest request, NodeType type,
boolean reserved) {
// How much does this request need?
Resource capability = request.getCapability();
// How much does the node have?
Resource available = node.getAvailableResource();
Container container = null;
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
container = createContainer(app, node, capability, priority);
}
// Can we allocate a container on this node?
if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
app.allocate(type, node, priority, request, container);
if (allocatedContainer == null) {
// Did the application need this resource?
if (reserved) {
unreserve(priority, node);
}
return Resources.none();
}
// If we had previously made a reservation, delete it
if (reserved) {
unreserve(priority, node);
}
// Inform the node
node.allocateContainer(app.getApplicationId(),
allocatedContainer);
return container.getResource();
} else {
// The desired container won't fit here, so reserve
reserve(priority, node, container, reserved);
return FairScheduler.CONTAINER_RESERVED;
}
}
NM context中的container记录了所有容器的信息
还有就是新版本的delay改成了次数,相比老版本的时间delay,个人觉得都差不多。
这样, RM一边通过FairScheduler分配资源,一边通过ApplicationMasterService返回分配的资源信息,完成了整套流程。
这里有个问题是RackLocal Request分配后,AM如何知道?
分享到:
Global site tag (gtag.js) - Google Analytics
相关推荐
多维度介绍yarn三种调度器
yarn调度流程.docx
现实生产环境当中资源往往是非常紧张的,尤其是在一个很繁忙的集群,一个应用资源的请求经常需要等待一段时间才能的到相应的资源,Yarn提供了多种调度器和可配置的策略供我们选择,来解决这一系列复杂的应用场景。...
spark-2.2.0-yarn-shuffle.jar
Hadoop,YARN,资源分配,调度
Hadoop技术-YARN资源调度器.pptx
Mapreduce分布式计算组件和YARN分布式资源调度
YARN是Hadoop的一个分布式的资源管理系统,用来提高分布式集群的内存、I/O、网络、磁盘等资源的...实验表明,在YARN的容量调度器和公平调度器的基础上使用该方法,相比于默认配置,MR作业完成时间分别减少53%和14%左右.
23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化 网址:https://blog.csdn.net/chenwewi520feng/article/details/130457270 本文介绍在hadoop集群中,不适用默认的参数情况下,yarn的cpu和内容配置...
yarn各节点运作流程
Yarn资源调度系统教程
关于Yarn内存分配与管理,主要涉及到了ResourceManage、ApplicationMatser、NodeManager这几个概念,相关的优化也要紧紧围绕着这几方面来开展。这里还有一个Container的概念,现在可以先把它理解为运行map/reduce ...
分析yarn 资源调度的小demo,里面提供了几个接口,我们可以通过api从官方抓取applications 的信息,然后根据返回的json或者xml 放入到resource文件夹内,然后启动服务有一个http 的外部接口,调用该接口即可以产生相...
YARN 基础架构,工作机制,任务调度器
Hadoop任务调度器 基础知识 • Hadoop调度流程 • Hadoop自带调度器介绍 • 编写自己的Hadoop调度器 • 总结
#资源达人分享计划#
调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动应用程序的master进程。 6 、 MapReduce作业的application master是一个Java应用程序,它的主类是MRAppMaster。它对作业进行初始化:通过创建多...
主要描述hadoop之YARN的应用场景、实现原理与资源调度
YARN应用场景、原理与资源调度v2.pdfYARN应用场景、原理与资源调度v2.pdf
大数据课程——Hadoop集群程序设计与开发,教师版,提供教学大纲、教案、教学设计、实训文档等,课程内容包含教学准备环境、软件安装、作业、教学文档、演示视频,花费巨额时间亲自制作,下载后可私信提供上述所有...