代码清单5-11 partitions方法的实现
- final def partitions: Array[Partition] = {
- checkpointRDD.map(_.partitions).getOrElse {
- if (partitions_ == null) {
- partitions_ = getPartitions
- }
- partitions_
- }
- }
- final def dependencies: Seq[Dependency[_]] = {
- checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
- if (dependencies_ == null) {
- dependencies_ = getDependencies
- }
- dependencies_
- }
- }
- private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
- private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
- def checkpointRDD: Option[RDD[T]] = {
- RDDCheckpointData.synchronized {
- cpRDD
- }
- }
- var cpState = Initialized
- @transient var cpFile: Option[String] = None
- var cpRDD: Option[RDD[T]] = None
- def checkpoint() {
- if (context.checkpointDir.isEmpty) {
- throw new SparkException("Checkpoint directory has not been set in the SparkContext")
- } else if (checkpointData.isEmpty) {
- checkpointData = Some(new RDDCheckpointData(this))
- checkpointData.get.markForCheckpoint()
- }
- }
- def markForCheckpoint() {
- RDDCheckpointData.synchronized {
- if (cpState == Initialized) cpState = MarkedForCheckpoint
- }
- }
- def doCheckpoint() {
- RDDCheckpointData.synchronized {
- if (cpState == MarkedForCheckpoint) {
- cpState = CheckpointingInProgress
- } else {
- return
- }
- }
- // Create the output path for the checkpoint
- val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
- val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
- if (!fs.mkdirs(path)) {
- throw new SparkException("Failed to create checkpoint path " + path)
- }
- // Save to file, and reload it as an RDD
- val broadcastedConf = rdd.context.broadcast(
- new SerializableWritable(rdd.context.hadoopConfiguration))
- rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
- val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
- if (newRDD.partitions.size != rdd.partitions.size) {
- throw new SparkException(
- "Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
- "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
- }
- // Change the dependencies and partitions of the RDD
- RDDCheckpointData.synchronized {
- cpFile = Some(path.toString)
- cpRDD = Some(newRDD)
- rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
- cpState = Checkpointed
- }
- logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
- }
1) 校验用户作业是否启用了检查点,即是否调用了checkpoint方法将cpState置为MarkedForCheckpoint。如果没有启用检查点,则直接返回,不继续进行检查点的保存。
2) 在HDFS上创建用于保存检查点数据的文件路径。其中checkpointDir必须由用户作业调用SparkContext的setCheckpointDir方法(见代码清单5-67)设置。
3) 运行作业,此作业实际执行了CheckpointRDD的writeToFile方法(见代码清单5-68),将检查点数据保存的HDFS上。
4) 将构造的CheckpointRDD由cpRDD持有,检查点保存目录由cpFile持有,最后将cpState设置为Checkpointed。由于保存了检查点,说明此RDD已经成功执行,其依赖和分区相关的信息将不再使用,即便是Job恢复也只需要从检查点读取数据,所以调用RDD的markCheckpointed方法(见代码清单5-69)清除依赖与分区信息。代码清单5-67 设置作业检查点在HDFS上的保存路径
- def setCheckpointDir(directory: String) {
- checkpointDir = Option(directory).map { dir =>
- val path = new Path(dir, UUID.randomUUID().toString)
- val fs = path.getFileSystem(hadoopConfiguration)
- fs.mkdirs(path)
- fs.getFileStatus(path).getPath.toString
- }
- }
- def writeToFile[T: ClassTag](
- path: String,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
- blockSize: Int = -1
- )(ctx: TaskContext, iterator: Iterator[T]) {
- val env = SparkEnv.get
- val outputDir = new Path(path)
- val fs = outputDir.getFileSystem(broadcastedConf.value.value)
- val finalOutputName = splitIdToFile(ctx.partitionId)
- val finalOutputPath = new Path(outputDir, finalOutputName)
- val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)
- if (fs.exists(tempOutputPath)) {
- throw new IOException("Checkpoint failed: temporary path " +
- tempOutputPath + " already exists")
- }
- val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
- val fileOutputStream = if (blockSize < 0) {
- fs.create(tempOutputPath, false, bufferSize)
- } else {
- // This is mainly for testing purpose
- fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
- }
- val serializer = env.serializer.newInstance()
- val serializeStream = serializer.serializeStream(fileOutputStream)
- serializeStream.writeAll(iterator)
- serializeStream.close()
- if (!fs.rename(tempOutputPath, finalOutputPath)) {
- if (!fs.exists(finalOutputPath)) {
- logInfo("Deleting tempOutputPath " + tempOutputPath)
- fs.delete(tempOutputPath, false)
- throw new IOException("Checkpoint failed: failed to save output of task: "
- + ctx.attemptId + " and final output path does not exist")
- } else {
- // Some other copy of this task must've finished before us and renamed it
- logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
- fs.delete(tempOutputPath, false)
- }
- }
- }
- private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
- clearDependencies()
- partitions_ = null
- deps = null // Forget the constructor argument for dependencies too
- }
- protected def clearDependencies() {
- dependencies_ = null
- }
1) 获取RDD的依赖时,如果有了检查点,则从检查点中读取;
2) 获取RDD的分区时,如果有了检查点,则从检查点中读取。
除了以上两种场景,还有一种场景会间接使用RDD的检查点数据,那就是在计算过程中调用RDD的computeOrReadCheckpoint方法(见代码清单5-70)以便直接从检查点读取保存的计算结果,关于此方法的具体使用放在第6章的分析代码清单6-1时介绍,此处只分析其使用检查点的实现。代码清单5-70 从检查点读取计算结果
- private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
- {
- if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
- }
- def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
代码清单5-71 判断RDD是否已经保持了检查点
- def isCheckpointed: Boolean = {
- RDDCheckpointData.synchronized { cpState == Checkpointed }
- }
根据之前的分析,我们知道如果已经保存了检查点,那么cpState必然等于Checkpointed,所以isCheckpointed方法将返回true。因此代码清单5-70将会继续执行firstParent[T].iterator(split, context)。而firstParent(见代码清单5-13)首先会调用代码清单5-28所示的dependencies方法,这样计算过程中调用computeOrReadCheckpoint,使用检查点的过程实际退化为我们说的获取RDD依赖时使用检查点的方式。而此时的依赖已经被CheckpointRDD所替代,经过迭代计算(请参考第6章),最终会调用CheckpointRDD的compute方法(见代码清单5-72),从其实现可知从检查点读取计算结果实际就是读取之前分析的写入HDFS的数据。
代码清单5-72 从HDFS保存的检查点读取数据
- override def compute(split: Partition, context: TaskContext): Iterator[T] = {
- val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
- CheckpointRDD.readFromFile(file, broadcastedConf, context)
- }
