开源中文网

您的位置: 首页 > Scala > 正文

spark 核心作业调度和任务调度

来源: cnblogs  作者: 混沌战神阿瑞斯

摘要:

  1. 基本概念

    1.1   Application

    1.2    Job

    1.3   Stage

    1.4   TaskSet

    1.5   Task

    1.6   DAG

      2.相关的类

    2.1DAGScheduler

    2.2ActiveJob

    2.3Stage

    2.4Task

  3.运行方式

  4.工作流程

    4.1划分Stage

    4.2生成Job,提交Stage

    4.3任务集的提交

    4.4任务作业完成状态的监控

    4.5任务结果的获取

内容总结:

  1. 基本概念

  首先,我们先列出任务调度涉及的相关概念:

  •   Application:由多个作业组成的Spark应用程序
  •   Job:由RDD Action产生的多个stage所组成的一次计算操作
  •   Stage:任务集所对应的调度阶段
  •   TaskSet:这是一组彼此之间有关联,但是互相不存在shuffle依赖的任务所组成的任务集
  •   Task:单个分区数据集上的处理流程单元
  •   DAG:有向无环图

  2. 相关的类:

  

   2.1作业调度(DAGScheduler)

   DAGScheduler 是基于stage的高级调度(逻辑调度),这个调度会计算每个Job对应的stage的DAG。然后然后以TaskSet的形式提交stage到底层的TaskScheduler.

   

  Spark的stages 是以shuffle为边界切分RDD图来创建的。具有窄依赖(例:map(),filter())的操作会在对应stage的一系列任务中管道式的运行,但是具有宽依赖的操作则需要多个stage.最后所有的stage之间将只有shuffle依赖关系。

  实际上这些操作发生在RDD.compute(),在各个RDD的实现上,比如MappedRDD,FilteredRDD等

   

  

  下面提到还一些概念:

  •    Jobs 是以ActiveJob类代表的,以下是ActiveJob的类签名

   

  ActiveJob 可以根据finalStage区分为两种:a result job(对应ResultStage)或者a map-stage job(对应ShuffleMapStage)。

  • Stage:以下是stage的类签名

  

  • task:也相应对应两个类:ShuffleMapTask和ResultTask,类签名如下:

    

      一个stage有若干个ShuffleMapTask和最后的一个任务ResultTasks组成,其中前者执行任务并将输出写入分区;后者执行任务将输出发送到驱动程序中(Driver Application)

  •   Cache tracking:在内存就从内存中去,否则就记住以及计算出来的map stages
  •  Preferred locations
  •  Cleanup:当Job结束时,所有依赖的数据结构也会被clear

  类签名:

  

   3.运行方式:

  DAGScheduler在SparkContext初始化过程中实例化,一个SparkContext对应一个DAGScheduler,DAGScheduler的事件循环逻辑基于Akka Actor的消息传递机制来构建,在DAGScheduler的Start函数中创建了一个eventProcessActor用来处理各种   DAGSchedulerEvent,这些事件包括作业的提交,任务状态的变化,监控等等

  DAGScheduler最重要的任务之一就是计算作业和任务的依赖关系,制定调度逻辑

  DAGScheduler作业调度的两个主要入口是submitJob 和 runJob,两者的区别在于前者返回一个Jobwaiter对象,可以用在异步调用中,用来判断作业完成或者取消作业,runJob在内部调用submitJob,阻塞等待直到作业完成(或失败)

  具体往DAGScheduler提交作业的操作,基本都是封装在RDD的相关Action操作里面,不需要用户显式的提交作业

   用户代码都是基于RDD的一系列计算操作,实际运行时,这些计算操作是Lazy执行的,并不是所有的RDD操作都会触发Spark往Cluster上提交实际作业,基本上只有一些需要返回数据或者向外部输出的操作才会触发实际计算工作,其它的变换操作基本上只是生成对应的RDD记录依赖关系。

   DAGScheduler内部维护了各种 task / stage / job之间的映射关系表

   3.1 DAGScheduler在SparkContext初始化过程中实例化,SparkContext类的相关代码

  

   3.2DAGScheduler的taskStarted函数中创建了一个eventProcessActor用来处理各种DAGSchedulerEvent

    

  3.3下面贴一个运行流程图:

  

 

  4.工作流程

   4.1划分Stage

   当某个操作触发计算,向DAGScheduler提交作业时,DAGScheduler需要从RDD依赖链最末端的RDD出发,遍历整个RDD依赖链,划分Stage任务阶段,并决定各个Stage之间的依赖关系。Stage的划分是以ShuffleDependency为依据的,也就是说当某个RDD的运算需要将数据进行Shuffle时,这个包含了Shuffle依赖关系的RDD将被用来作为输入信息,构建一个新的Stage,由此为依据划分Stage,可以确保有依赖关系的数据能够按照正确的顺序得到处理和运算。

   以GroupByKey操作为例,该操作返回的结果实际上是一个ShuffleRDD,当DAGScheduler遍历到这个ShuffleRDD的时候,因为其Dependency是一个ShuffleDependency,于是这个ShuffleRDD的父RDD以及shuffleDependency等对象就被用来构建一个新的Stage,这个Stage的输出结果的分区方式,则由ShuffleDependency中的Partitioner对象来决定。

   可以看到,尽管划分和构建Stage的依据是ShuffleDependency,对应的RDD也就是这里的ShuffleRDD,但是这个Stage所处理的数据是从这个shuffleRDD的父RDD开始计算的,只是最终的输出结果的位置信息参考了ShuffleRDD返回的ShuffleDependency里所包含的内容。而shuffleRDD本身的运算操作(其实就是一个获取shuffle结果的过程),是在下一个Stage里进行的。

  贴一张图:

  

 

  4.2生成Job,提交Stage

   上一个步骤得到一个或多个有依赖关系的Stage,其中直接触发Job的RDD所关联的Stage作为FinalStage生成一个Job实例,这两者的关系进一步存储在resultStageToJob映射表中,用于在该Stage全部完成时做一些后续处理,如报告状态,清理Job相关数据等。具体提交一个Stage时,首先判断该Stage所依赖的父Stage的结果是否可用,如果所有父Stage的结果都可用,则提交该Stage,如果有任何一个父Stage的结果不可用,则迭代尝试提交父Stage。 所有迭代过程中由于所依赖Stage的结果不可用而没有提交成功的Stage都被放到waitingStages列表中等待将来被提交

   什么时候waitingStages中的Stage会被重新提交呢,当一个属于中间过程Stage的任务(这种类型的任务所对应的类为ShuffleMapTask)完成以后,DAGScheduler会检查对应的Stage的所有任务是否都完成了,如果是都完成了,则DAGScheduler将重新扫描一次waitingStages中的所有Stage,检查他们是否还有任何依赖的Stage没有完成,如果没有就可以提交该Stage。

  此外每当完成一次DAGScheduler的事件循环以后,也会触发一次从等待和失败列表中扫描并提交就绪Stage的调用过程

  下面是submitStage的代码:

  

  4.3任务集的提交

   每个Stage的提交,最终是转换成一个TaskSet任务集的提交,DAGScheduler通过TaskScheduler接口提交TaskSet这个TaskSet最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个TaskSet的生命周期,对于DAGScheduler来说提交Stage的工作到此就完成了。而TaskScheduler的具体实现则会在得到计算资源的时候,进一步通过TaskSetManager调度具体的Task到对应的Executor节点上进行运算

  4.4任务作业完成状态的监控

   要保证相互依赖的job/stage能够得到顺利的调度执行,DAGScheduler就必然需要监控当前Job / Stage乃至Task的完成情况。这是通过对外(主要是对TaskScheduler)暴露一系列的回调函数来实现的,对于TaskScheduler来说,这些回调函数主要包括任务的开始结束失败,任务集的失败,DAGScheduler根据这些Task的生命周期信息进一步维护Job和Stage的状态信息。

   此外TaskScheduler还可以通过回调函数通知DAGScheduler具体的Executor的生命状态,如果某一个Executor崩溃了,或者由于任何原因与Driver失去联系了,则对应的Stage的shuffleMapTask的输出结果也将被标志为不可用,这也将导致对应Stage状态的变更,进而影响相关Job的状态,再进一步可能触发对应Stage的重新提交来重新计算获取相关的数据。

   4.5任务结果的获取

   一个具体的任务在Executor中执行完毕以后,其结果需要以某种形式返回给DAGScheduler,根据任务类型的不同,任务的结果的返回方式也不同

   对于FinalStage所对应的任务(对应的类为ResultTask)返回给DAGScheduler的是运算结果本身,而对于ShuffleMapTask,返回给DAGScheduler的是一个MapStatus对象,MapStatus对象管理了ShuffleMapTask的运算输出结果在BlockManager里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个Stage的任务的获取输入数据的依据

   而根据任务结果的大小的不同,ResultTask返回的结果又分为两类,如果结果足够小,则直接放在DirectTaskResult对象内,如果超过特定尺寸(默认约10MB)则在Executor端会将DirectTaskResult先序列化,再把序列化的结果作为一个Block存放在BlockManager里,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中返回给TaskScheduler,TaskScheduler进而调用TaskResultGetter将IndirectTaskResult中的BlockID取出并通过BlockManager最终取得对应的DirectTaskResult。当然从DAGScheduler的角度来说,这些过程对它来说是透明的,它所获得的都是任务的实际运算结果。

Tags:核心 任务
相关文章列表:
关于开源中文网 - 联系我们 - 广告服务 - 网站地图 - 版权声明