`
cloudeagle_bupt
  • 浏览: 538809 次
文章分类
社区版块
存档分类
最新评论

Apache Tez DAG计算应用框架

 
阅读更多

转自:http://zcdeng.iteye.com/blog/1897208


1. Tez简介

Tez是基于Hadoop Yarn之上的DAG(有向无环图,Directed Acyclic Graph)计算框架。它把Map/Reduce过程拆分成若干个子过程,同时可以把多个Map/Reduce任务组合成一个较大的DAG任务,减少了Map/Reduce之间的文件存储。同时合理组合其子过程,也可以减少任务的运行时间。

2. DAG计算模型

Map/Reduce不能解决所有问题,它适合在分布式环境中处理那些海量数据批处理计算程序,其计算模型主要分为两阶段:第一阶段为Map阶段,输出的是<Key, Value>Pair对;再进行数据的Shuffle和Sort;然进入第二阶段Reduce阶段,在这一阶段就是对<Key, Value>对的计算逻辑处理。但是它无法更好地完成要求更高的计算任务,例如图计算中需要BSP迭代计算框架,要把上一个Map/Reduce任务的输出用于下一个Map/Reduce任务的输入;类似Hive和Pig的交互式有向图计算。DAG计算模型是针对Map/Reduce所遇问题而提出来的一种计算模型。下图是Map/Reduce模型与DAG模型的差别。


从图中可以看出:当采用Map/Reduce模型,我们处理一个大任务时需要四个Map/Reduce,那么就需要四个小Job来组合成一个大Job,这样会多几次的输入输出消耗。而采用Tez,它们形成一个大任务,这些子任务组合成一张DAG图,在数据的处理中间过程中,就没有往hdfs里面写数据,而是直接向它的后继节点输出数据。

3. Tez框架实现

在其中一篇技术博客Hadoop Yarn解决多类应用兼容方法讲到在Yarn上如何兼容各类应用的思路。在Hadoop Yarn上实现Hama BSP计算应用博文中讲解了如何在Yarn上开发出一个自己的应用。在这里,我将着重讲解在Tez应用的代码结构上,它是如何实现一个DAG计算模型。

从前面的博文中提到,对每个应用都需要去实现一个YARNRunner类去提交c对应的Job。在Tez里面,有一个这样的类org.apache.tez.mapreduce.YARNRunner。我们将以这个类为入口,讲解Tez的实现过程。

如下是Tez YARNRunner提交任务的实现代码。

Java代码收藏代码
  1. @Override
  2. publicJobStatussubmitJob(JobIDjobId,StringjobSubmitDir,Credentialsts)
  3. throwsIOException,InterruptedException{
  4. //与MR应用一样,先向RM获得一个applicationID。
  5. ApplicationIdappId=resMgrDelegate.getApplicationId();
  6. FileSystemfs=FileSystem.get(conf);
  7. //Loadsthejob.xmlwrittenbytheuser.
  8. JobConfjobConf=newJobConf(newTezConfiguration(conf));
  9. //ExtractindividualrawMRconfigs.
  10. //为每个stage创建它自己的conf文件
  11. Configuration[]stageConfs=MultiStageMRConfToTezTranslator
  12. .getStageConfs(jobConf);
  13. //TransformallconfstouseTezkeys
  14. MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0],
  15. null);
  16. for(inti=1;i<stageConfs.length;i++){
  17. MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[i],
  18. stageConfs[i-1]);
  19. }
  20. //createinputstotezClient.submit()
  21. //FIXMEsetupjobresources
  22. Map<String,LocalResource>jobLocalResources=
  23. createJobLocalResources(stageConfs[0],jobSubmitDir);
  24. //FIXMEcreateDAGshouldtakethetezConfasaparameter,insteadofusing
  25. //MRkeys.
  26. //创建它的一个DAG图
  27. DAGdag=createDAG(fs,jobId,stageConfs,jobSubmitDir,ts,
  28. jobLocalResources);
  29. //略去...,创建一堆与Appmaster相关的conf配置,用于启动Tez的appmaster所用
  30. //SubmittoResourceManager
  31. try{
  32. PathappStagingDir=fs.resolvePath(newPath(jobSubmitDir));
  33. //向集群提交DAG任务
  34. dagClient=tezClient.submitDAGApplication(
  35. appId,
  36. dag,
  37. appStagingDir,
  38. ts,
  39. jobConf.get(JobContext.QUEUE_NAME,
  40. YarnConfiguration.DEFAULT_QUEUE_NAME),
  41. vargs,
  42. environment,
  43. jobLocalResources,dagAMConf);
  44. }catch(TezExceptione){
  45. thrownewIOException(e);
  46. }
  47. returngetJobStatus(jobId);
  48. }
上面的代码之中可以看出,它需要为该任务构造一个DAG图。下面是org.apache.tez.mapreduce.YARNRunner.createDAG(FileSystem, JobID, Configuration[], String, Credentials, Map<String, LocalResource>)的源码实现。
Java代码收藏代码
  1. privateDAGcreateDAG(FileSystemfs,JobIDjobId,Configuration[]stageConfs,
  2. StringjobSubmitDir,Credentialsts,
  3. Map<String,LocalResource>jobLocalResources)throwsIOException{
  4. //为DAG任务命名
  5. StringjobName=stageConfs[0].get(MRJobConfig.JOB_NAME,
  6. YarnConfiguration.DEFAULT_APPLICATION_NAME);
  7. DAGdag=newDAG(jobName);
  8. LOG.info("Numberofstages:"+stageConfs.length);
  9. TaskLocationHint[]mapInputLocations=getMapLocationHintsFromInputSplits(
  10. jobId,fs,stageConfs[0],jobSubmitDir);
  11. TaskLocationHint[]reduceInputLocations=null;
  12. //各个子任务subtask的初始化
  13. Vertex[]vertices=newVertex[stageConfs.length];//构造task节点
  14. for(inti=0;i<stageConfs.length;i++){
  15. vertices[i]=createVertexForStage(stageConfs[i],jobLocalResources,
  16. i==0?mapInputLocations:reduceInputLocations,i,
  17. stageConfs.length);
  18. }
  19. for(inti=0;i<vertices.length;i++){
  20. dag.addVertex(vertices[i]);//向dag中添加任务节点
  21. if(i>0){
  22. EdgePropertyedgeProperty=newEdgeProperty(
  23. ConnectionPattern.BIPARTITE,SourceType.STABLE,
  24. newOutputDescriptor(OnFileSortedOutput.class.getName(),null),
  25. newInputDescriptor(ShuffledMergedInput.class.getName(),null));
  26. Edgeedge=null;
  27. edge=newEdge(vertices[i-1],vertices[i],edgeProperty);
  28. dag.addEdge(edge);//向DAG图中添加边的属性
  29. }
  30. }
  31. returndag;
  32. }
大任务的DAG计算信息都存储在Vertex和Edge里面。我们将在这里详细分析Vertex和Edge的关系。
下面是向RM提交的任务信息,用于启动tez appmaster。appmaster的启动类为org.apache.tez.dag.app.DAGAppMaster。
Java代码收藏代码
  1. privateApplicationSubmissionContextcreateApplicationSubmissionContext(
  2. ApplicationIdappId,DAGdag,PathappStagingDir,Credentialsts,
  3. StringamQueueName,StringamName,List<String>amArgs,
  4. Map<String,String>amEnv,Map<String,LocalResource>amLocalResources,
  5. TezConfigurationamConf)throwsIOException,YarnException{
  6. //省略一些配置参数及方法(conf配置,环境变量classpath参数和appmasterJava命令)...
  7. //emitprotobufDAGfilestyle
  8. PathbinaryPath=newPath(appStagingDir,
  9. TezConfiguration.TEZ_AM_PLAN_PB_BINARY+"."+appId.toString());
  10. amConf.set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,binaryPath.toUri()
  11. .toString());
  12. ConfigurationfinalAMConf=createFinalAMConf(amConf);
  13. DAGPlandagPB=dag.createDag(finalAMConf);//用dag构建一个DAGPlan作业计划
  14. FSDataOutputStreamdagPBOutBinaryStream=null;
  15. try{
  16. //binaryoutput
  17. dagPBOutBinaryStream=FileSystem.create(fs,binaryPath,
  18. newFsPermission(TEZ_AM_FILE_PERMISSION));
  19. dagPB.writeTo(dagPBOutBinaryStream);//并且写到硬盘上
  20. }finally{
  21. if(dagPBOutBinaryStream!=null){
  22. dagPBOutBinaryStream.close();
  23. }
  24. }
  25. //省略localResources的配置信息...
  26. //SetupContainerLaunchContextforAMcontainer
  27. ContainerLaunchContextamContainer=
  28. ContainerLaunchContext.newInstance(localResources,environment,
  29. vargsFinal,null,securityTokens,acls);
  30. //SetuptheApplicationSubmissionContext
  31. ApplicationSubmissionContextappContext=Records
  32. .newRecord(ApplicationSubmissionContext.class);
  33. appContext.setApplicationType(TezConfiguration.TEZ_APPLICATION_TYPE);
  34. appContext.setApplicationId(appId);
  35. appContext.setResource(capability);
  36. appContext.setQueue(amQueueName);
  37. appContext.setApplicationName(amName);
  38. appContext.setCancelTokensWhenComplete(conf.getBoolean(
  39. TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
  40. TezConfiguration.DEFAULT_TEZ_AM_CANCEL_DELEGATION_TOKEN));
  41. appContext.setAMContainerSpec(amContainer);
  42. returnappContext;
  43. }
4. Vertex & Edge
<续>
5. MapReduce
<续>

分享到:
评论

相关推荐

    Apache TEZ部署手册

    Apache TEZ 部署手册 的各个步骤,包括打包等步骤说明

    为Apache Hadoop、Spark以及Tez等大数据计算框架集成.zip

    hadoop-cos(CosN文件系统)为Apache Hadoop、Spark以及Tez等大数据计算框架集成提供支持,可以像访问HDFS一样读写存储在腾讯云COS上的数据。同时也支持作为Druid等查询与分析引擎的Deep Storage. 各领域数据集,...

    Bikas Saha:Apache Tez-A框架模型和构建Hadoop数据处理应用程序

    该文档来自于Apache Hadoop和Tez项目PMC成员Bikas Saha,在2014中国大数据技术大会大数据技术分论坛的演讲“Apache Tez-A Framework to Model and Build Hadoop Data Processing Applications”。

    apache-tez-0.9.2-bin.tar.gz

    apache tez 安装

    hadoop-cos(CosN文件系统)为Apache Hadoop、Spark以及Tez等大数据计算框架集成提供支.zip

    hadoop-cos(CosN文件系统)为Apache Hadoop、Spark以及Tez等大数据计算框架集成提供支.zip

    07.新一代计算框架Tez.pptx.pptx

    Tez是Apache开源的支持DAG作业的计算框架,它直接源于MapReduce框架,核心思想是将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output

    apache-tez-0.9.0-bin.tar.gz

    Tez是Apache开源的支持DAG作业的计算框架,它直接源于MapReduce框架,核心思想是将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、...

    apache-tez-0.8.3-src.tar.gz

    Tez是Apache开源的支持DAG作业的计算框架,它直接源于MapReduce框架,核心思想是将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、...

    tez:Apache Tez

    阿帕奇·特兹(Apache Tez) ... 数据处理应用程序的主机,通过它可以将上述任意数据处理“任务”组合到任务DAG中,以根据需要处理数据。 通用主机被实现为Apache Hadoop YARN ApplicationMaster。

    源码apache-tez-0.8.3编译后的hadoop2.7.3版本hive-tez包tez-0.8.3.tar.gz

    源码使用的是apache-tez-0.8.3,对应的hadoop版本2.7.3,源码包中的nodejs的版本是v0.12.3,很难编译通过,最后把nodejs改成了v4.0.0才编译通过tez-ui2模块。

    apache-tez-0.10.2-src.tar.gz

    Tez是Apache最新的支持DAG作业的开源计算框架,它可以将多个有依赖的作业转换为一个作业从而大幅提升DAG作业的性能。Tez并不直接面向最终用户——事实上它允许开发者为最终用户构建性能更快、扩展性更好的应用程序。...

    apache-tez-0.10.2-bin.tar.gz

    Tez是Apache最新的支持DAG作业的开源计算框架,它可以将多个有依赖的作业转换为一个作业从而大幅提升DAG作业的性能。Tez并不直接面向最终用户——事实上它允许开发者为最终用户构建性能更快、扩展性更好的应用程序。...

    apache-tez-0.9.2-src.tar.gz

    Tez是Apache最新的支持DAG作业的开源计算框架,它可以将多个有依赖的作业转换为一个作业从而大幅提升DAG作业的性能。Tez并不直接面向最终用户——事实上它允许开发者为最终用户构建性能更快、扩展性更好的应用程序。...

    apache-tez-0.10.1-src.tar.gz

    Tez是Apache最新的支持DAG作业的开源计算框架,它可以将多个有依赖的作业转换为一个作业从而大幅提升DAG作业的性能。Tez并不直接面向最终用户——事实上它允许开发者为最终用户构建性能更快、扩展性更好的应用程序。...

    apache-tez源码

    学习大数据的小白用得到的apache tez源码,通过源码可以感受大牛们的代码风格和思维逻辑

    apache-tez-0.8.5-bin.tar.gz

    只支持 hadoop2.6以上,hive必须是apache版本的 如果使用CDH的hive需要处理jar包冲突问题,太麻烦了

    Apache Tez

    基于YARN来将数据流构建为一个DAG,能够更好地优化程序的执行过程。应该比基于MapReduce引擎的程序效率和通用性都提高了。

    apache-tez-0.9.1-bin.tar.gz

    用Hive直接编写MR程序,假设有四个有依赖关系的MR作业,上图中,绿色是Reduce Task,云状表示写屏蔽,...Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能

    docker-hive-on-tez:在 Tez 上运行的 Apache Hive 的 Docker 镜像

    在 Tez 上运行 Apache Hive 的 Docker 镜像此存储库包含一个 docker 文件,用于构建 docker 映像以在 Tez 上运行 Apache Hive。 这个 docker 文件依赖于我的其他包含和 基础镜像的存储库。当前版本Apache Hive(主干...

    源码apache-tez-0.8.3编译后的hadoop2.8.3版本hive-tez包tez-0.8.3.tar.gz

    源码使用的是apache-tez-0.8.3,对应的hadoop版本2.8.3,源码包中的nodejs的版本是v0.12.3,很难编译通过,最后把nodejs改成了v4.0.0才编译通过tez-ui2模块。

Global site tag (gtag.js) - Google Analytics