Spark持久化以及checkpoint原理分析
在Spark 的持久化使用中,我們會將一些經常使用到的資料進行持久化,比如使用cache()或者persist()方法進行持久化操作,但是當某個節點或者executor掛掉之後,持久化的資料會丟失,因為我們的資料是儲存在記憶體當中的,這時就會重新計算RDD,如果某個之前的RDD需要大量的計算時間,這時將會浪費很多時間,因此,我們有時候需要使用checkpoint操作來將一些資料持久化可容錯檔案系統中,比如HDFS檔案系統中,雖然這種方式可能對效能帶來了一定的影響(磁碟IO),但是為了避免大量的重複計算操作,有時也可以使用效能代價來換取時間效率上的提升。
當我們對某個RDD進行了快取操作之後,首先會去CaacheManager中去找,然後緊接著去BlockManager中去獲取記憶體或者磁碟中快取的資料,如果沒有進行快取或者快取丟失,那麼就會去checkpoint的容錯檔案系統中查詢資料,如果最終沒有找到,那就會按照RDD lineage重新計算。
checkpoint原理
1.在程式碼中,當使用SparkContext可以設定一個checkpointFile檔案目錄,比如HDFS檔案目錄。
2.在程式碼中對需要checkpoint的RDD呼叫checkpoint方法。
3.RDDCheckpointData(spark內部的API),接管你的RDD,會標記為marked for checkpointing,準備進行checkpoint。
4.你的job執行完之後,會呼叫一個finalRDD.doCheckpoint()方法,會順著rdd lineage,回溯掃描,發現有標記為待checkpoint的rdd,就會進行二次標記,標記為checkpointing in progress,正在接受checkpoint操作。
5.job執行完之後,就會啟動一個內部的新job,去將標記為checkpointing in progress的rdd的資料,都寫入hdfs檔案中。(如果rdd之前cache過,會直接從快取中獲取資料,寫入hdfs中;如果沒有cache過,那麼就會重新計算一遍這個rdd,再checkpoint)
6.將checkpoint過的rdd之前的依賴rdd,改成一個CheckpointRDD*,強制改變你的rdd的lineage。後面如果rdd的cache資料獲取失敗,直接會通過它的上游CheckpointRDD,去容錯的檔案系統,比如hdfs,中,獲取checkpoint的資料。
RDDCheckpointData原始碼如下:
/**
* Enumeration to manage state transitions of an RDD through checkpointing
* [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]
*/
private[spark] object CheckpointState extends Enumeration {
type CheckpointState = Value
val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value
}
/**
* This class contains all the information related to RDD checkpointing. Each instance of this
* class is associated with a RDD. It manages process of checkpointing of the associated RDD,
* as well as, manages the post-checkpoint state by providing the updated partitions,
* iterator and preferred locations of the checkpointed RDD.
*/
private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
extends Logging with Serializable {
import CheckpointState._
// The checkpoint state of the associated RDD.
var cpState = Initialized
// The file to which the associated RDD has been checkpointed to
@transient var cpFile: Option[String] = None
// The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.
var cpRDD: Option[RDD[T]] = None
// Mark the RDD for checkpointing
def markForCheckpoint() {
RDDCheckpointData.synchronized {
if (cpState == Initialized) cpState = MarkedForCheckpoint
}
}
// Is the RDD already checkpointed
def isCheckpointed: Boolean = {
RDDCheckpointData.synchronized { cpState == Checkpointed }
}
// Get the file to which this RDD was checkpointed to as an Option
def getCheckpointFile: Option[String] = {
RDDCheckpointData.synchronized { cpFile }
}
// Do the checkpointing of the RDD. Called after the first job using that RDD is over.
def doCheckpoint() {
// If it is marked for checkpointing AND checkpointing is not already in progress,
// then set it to be in progress, else return
RDDCheckpointData.synchronized {
if (cpState == MarkedForCheckpoint) {
cpState = CheckpointingInProgress
} else {
return
}
}
// Create the output path for the checkpoint
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
//獲取checkpoint檔案路徑
val fs = path.getFileSystem(rdd.context.HadoopConfiguration)
if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
// Save to file, and reload it as an RDD
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
//checkpoint資料到檔案系統中
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
throw new SparkException(
"Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
}
// Change the dependencies and partitions of the RDD
RDDCheckpointData.synchronized {
cpFile = Some(path.toString)
cpRDD = Some(newRDD)
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
}
logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
}
// Get preferred location of a split after checkpointing
def getPreferredLocations(split: Partition): Seq[String] = {
RDDCheckpointData.synchronized {
cpRDD.get.preferredLocations(split)
}
}
def getPartitions: Array[Partition] = {
RDDCheckpointData.synchronized {
cpRDD.get.partitions
}
}
def checkpointRDD: Option[RDD[T]] = {
RDDCheckpointData.synchronized {
cpRDD
}
}
}
在CheckPointRDD中寫檔案的操作如下:
def writeToFile[T: ClassTag](
path: String,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(broadcastedConf.value.value)
val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
val tempOutputPath =
new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptNumber)
if (fs.exists(tempOutputPath)) {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
} else {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
serializeStream.close()
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
logInfo("Deleting tempOutputPath " + tempOutputPath)
fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: "
+ ctx.attemptNumber + " and final output path does not exist")
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
fs.delete(tempOutputPath, false)
}
}
}
在RDD類的原始碼中,兩個方法如下所示:
/**
* Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint() {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
/**
* Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
* created from the checkpoint file, and forget its old dependencies and partitions.
*/
private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
clearDependencies()
partitions_ = null
deps = null // Forget the constructor argument for dependencies too
}
在我們的應用程式中,在使用checkpoint的時候只需要進行兩步簡單的操作即可,使用SparkContext設定一個checkPoint檔案目錄,在需要checkpoint的RDD中呼叫doCheckpoint方法即可。
Linux公社的RSS地址:ofollow,noindex" target="_blank">https://www.linuxidc.com/rssFeed.aspx
本文永久更新連結地址:https://www.linuxidc.com/Linux/2018-09/154132.htm