`
cloudeagle_bupt
  • 浏览: 540551 次
文章分类
社区版块
存档分类
最新评论

MapReduce任务创建和分配流程

 
阅读更多

这篇文章写的不错 http://blog.csdn.net/jackydai987/article/details/6227365

总结下主要流程:

1. JobClient.runJob()

根据用户设置的InputFormat类将输入数据进行切分,将相应的信息放在job.jar,job.splitjob.xml这三个文件中并存入HDFS.

2.:JobTracker.submitJob()

创建新的JIP对象,其初始化时会将job.jar,job.splitjob.xml这三个文件存放在本地文件系统的临时目录中。经过监听器等的一系列操作,JT.JobInitThread 最终调用该JIP的initTasks()函数进行初始化。

3. initTasks()

这里有四个关键数据结构:

// NetworkTopology Node to the set of TIPs
Map<Node, List<TaskInProgress>> nonRunningMapCache;


// Map of NetworkTopology Node to set of running TIPs
Map<Node, Set<TaskInProgress>> runningMapCache;


// A list of non-local, non-running maps
final List<TaskInProgress> nonLocalMaps;


// A set of non-local running maps

Set<TaskInProgress> nonLocalRunningMaps;


首先根据获得的split数目创建相应的TIP对象, 并通过createCache初始化nonRunningMapCache来建立hostMap,即节点和任务之间的关系。至于不具备数据本地性任务的TIP, 放入nonLoalMaps中.

建立关系时,主要按照由近及远,由本节点到rackNode的流程。将该任务分别加入该node节点和其父节点(rack node), 由于maxlevel默认为2,因此一般nonRunningMapCache中存放的就是节点本地性任务和机架本地性任务。


  private Map<Node, List<TaskInProgress>> createCache(
                                 TaskSplitMetaInfo[] splits, int maxLevel)
                                 throws UnknownHostException {
    Map<Node, List<TaskInProgress>> cache = 
      new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
    Set<String> uniqueHosts = new TreeSet<String>();
    for (int i = 0; i < splits.length; i++) {
      String[] splitLocations = splits[i].getLocations();
      if (splitLocations == null || splitLocations.length == 0) {
        nonLocalMaps.add(maps[i]);
        continue;
      }

      for(String host: splitLocations) {
        Node node = jobtracker.resolveAndAddToTopology(host);
        uniqueHosts.add(host);
        LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
        for (int j = 0; j < maxLevel; j++) {
          List<TaskInProgress> hostMaps = cache.get(node);
          if (hostMaps == null) {
            hostMaps = new ArrayList<TaskInProgress>();
            cache.put(node, hostMaps);
            hostMaps.add(maps[i]);
          }
          //check whether the hostMaps already contains an entry for a TIP
          //This will be true for nodes that are racks and multiple nodes in
          //the rack contain the input for a tip. Note that if it already
          //exists in the hostMaps, it must be the last element there since
          //we process one TIP at a time sequentially in the split-size order
          if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
            hostMaps.add(maps[i]);
          }
          node = node.getParent();
        }
      }
    }
    
    // Calibrate the localityWaitFactor - Do not override user intent!
    if (localityWaitFactor == DEFAULT_LOCALITY_WAIT_FACTOR) {
      int jobNodes = uniqueHosts.size();
      int clusterNodes = jobtracker.getNumberOfUniqueHosts();
      
      if (clusterNodes > 0) {
        localityWaitFactor = 
          Math.min((float)jobNodes/clusterNodes, localityWaitFactor);
      }
      LOG.info(jobId + " LOCALITY_WAIT_FACTOR=" + localityWaitFactor);
    }
    
    return cache;
  }


4. 任务分配时

本地性任务的区分靠findNewMapTask的参数maxLevel来区分,maxLevel=1时调度Node Local, 2时NodeOrRackLocal,

主要分配函数为scheduleMap, 其主要作用是将从nonRunningMapCache中找到的符合条件的TIP(本地或者是非本地),取出放入runningMapCache中,即可。

至于不具备数据本地性任务的TIP,同理从nonLocalMaps中找到相应任务, 放入nonLocalRunningMaps中。

  protected synchronized void scheduleMap(TaskInProgress tip) {
    
    if (runningMapCache == null) {
      LOG.warn("Running cache for maps is missing!! " 
               + "Job details are missing.");
      return;
    }
    String[] splitLocations = tip.getSplitLocations();

    // Add the TIP to the list of non-local running TIPs
    if (splitLocations == null || splitLocations.length == 0) {
      nonLocalRunningMaps.add(tip);
      return;
    }

    for(String host: splitLocations) {
      Node node = jobtracker.getNode(host);

      for (int j = 0; j < maxLevel; ++j) {
        Set<TaskInProgress> hostMaps = runningMapCache.get(node);
        if (hostMaps == null) {
          // create a cache if needed
          hostMaps = new LinkedHashSet<TaskInProgress>();
          runningMapCache.put(node, hostMaps);
        }
        hostMaps.add(tip);
        node = node.getParent();
      }
    }
  }


以上是任务的创建和本地性分配,非本地性任务的分配流程有时间在描述。



分享到:
评论

相关推荐

    利用采样器实现mapreduce任务输出全排序

    利用采样器实现mapreduce任务输出全排序大数据-MapReduce

    mapreduce编程说明和程序流程

    mapreduce编程说明和程序流程,分布式开发底层技术

    论文研究-一种异构环境下的基于MapReduce任务调度改进机制.pdf

    针对在异构环境下采用现有MapReduce任务调度机制可能出现各计算节点间数据迁移和系统资源分配难以管理的问题, 提出一种动态的任务调度机制来改善这些问题。该机制先根据节点的计算能力按比例放置数据, 然后通过资源...

    mapreduce详细流程

    最详细的mapreduce流程图和说明,包含每一步的排序、归并等

    在Hadoop的MapReduce任务中使用C程序的三种方法

    Hadoop是一个主要由Java语言开发的项目,基于Hadoop的MapReduce程序也主要是使用Java语言来编写。...经过调研,在MapReduce任务中使用C++程序的方法主要有三种:Hadoop Streaming、Hadoop Pipes以及Hadoop JNI。

    MapReduce详细流程

    里边就一张图,显示了MapReduce的详细流程,还算是比较实用。

    Hadoop原理与技术MapReduce实验

    (2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...

    mapreduce mapreduce mapreduce

    mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...

    Mapreduce实验报告.doc

    MapWorkerSTATE[] ReduceWorkerSTATE[] //各个工作机器的忙闲状态 FileSplit(string inputfilename) //输入文件切割 JobAssign() //工作任务分配 2. Map:主要功能是读取经过切割split文件形成一个map任务,分析...

    23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化

    23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化 网址:https://blog.csdn.net/chenwewi520feng/article/details/130457270 本文介绍在hadoop集群中,不适用默认的参数情况下,yarn的cpu和内容配置...

    Hadoop介绍,HDFS和MapReduce工作原理

    Hadoop介绍,HDFS和MapReduce工作原理

    实验项目 MapReduce 编程

    SecondaryNameNode、ResourceManager、NodeManager 和 JobHistoryServer。 2. 在 Hadoop 集群主节点上搭建 MapReduce 开发环境 Eclipse。 3. 查看 Hadoop 自带的 MR-App 单词计数源代码 WordCount.java,在 Eclipse ...

    用于期限约束的MapReduce任务调度算法

    用于期限约束的MapReduce任务调度算法

    MapReduce执行流程

    执行reduce任务,将任务输出保存到HDFS若对流程细节进行深究,可以得到这样一张流程图从生命周期的角度来看,mapreduce流程大概经历这样几个阶段:初始化、分配、执行、反馈、成功与失败的后续处理每个阶段所做的...

    MapReduce作业运行流程

    TaskTracker:定期与JobTracker通信,执行Map和Reduce任务 HDFS:保存作业的数据、配置、jar包、结果 作业运行流程 1.在客户端启动一个作业。 2.向JobTracker请求一个Job ID。 3.将运行作业所需要的资源文件复制到...

    MapReduce编程实例:单词计数

    本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。 任务准备 单词计数(WordCount)的任务是对一组...首先,在本地创建 3 个文件:file00l、file002 和 file003,文件具体

    大数据平台构建:MapReduce的重要概念.pptx

    大数据场景当中我们处理的数据量非常之大,任务也很重,于是我们通常采用分治的思想,把这样一个大任务拆分为数个本质相同却又互相独立的小任务(就像将一个大面包切分成多块小的面包),这些小任务同时进行计算,后...

    Hadoop中MapReduce基本案例及代码(五)

    下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。 Map任务: 读取输入文件内容,解析成key,value对。对输入文件的每一行,解析成key,value对。每一个键值对调用一次map函数。 写自己的逻辑,对...

    MapReduce技术流程介绍

    文档详细介绍了Mapreduce的开发流程,及开发过程中常见的一些问题,在我的编程过程中这些问题困扰了我很长时间,希望把它记录下来。

    MapReduce发明人关于MapReduce的介绍

    MapReduce发明人关于MapReduce的介绍

Global site tag (gtag.js) - Google Analytics