Checkpointing is a process of writing received records (by means of input dstreams) at checkpoint intervals to a highly-available HDFS-compatible storage. It allows creating fault-tolerant stream processing pipelines so when a failure occurs input dstreams can restore the before-failure streaming state and continue stream processing (as if nothing had happened).
DStreams can checkpoint input data at specified time intervals.
You use StreamingContext.checkpoint method to set up a HDFS-compatible checkpoint directory where checkpoint data will be persisted, as follows:
ssc.checkpoint("_checkpoint")
You can set up periodic checkpointing of a dstream every checkpoint interval using DStream.checkpoint method.
val ssc: StreamingContext = ...
// set the checkpoint directory
ssc.checkpoint("_checkpoint")
val ds: DStream[Int] = ...
val cds: DStream[Int] = ds.checkpoint(Seconds(5))
// do something with the input dstream
cds.print
You can create a StreamingContext from a checkpoint directory, i.e. recreate a fully-working StreamingContext as recorded in the last valid checkpoint file that was written to the checkpoint directory.
Note
|
You can also create a brand new StreamingContext (and putting checkpoints aside). |
Warning
|
You must not create input dstreams using a StreamingContext that has been recreated from checkpoint. Otherwise, you will not start the StreamingContext at all. |
When you use StreamingContext(path: String)
constructor (or the variants thereof), it uses Hadoop configuration to access path
directory on a Hadoop-supported file system.
Effectively, the two variants use StreamingContext(path: String, hadoopConf: Configuration)
constructor that reads the latest valid checkpoint file (and hence enables )
Note
|
SparkContext and batch interval are set to their corresponding values using the checkpoint file.
|
The following Scala code demonstrates how to use the checkpoint directory _checkpoint
to (re)create the StreamingContext or create one from scratch.
val appName = "Recreating StreamingContext from Checkpoint"
val sc = new SparkContext("local[*]", appName, new SparkConf())
val checkpointDir = "_checkpoint"
def createSC(): StreamingContext = {
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))
// NOTE: You have to create dstreams inside the method
// See http://stackoverflow.com/q/35090180/1305344
// Create constant input dstream with the RDD
val rdd = sc.parallelize(0 to 9)
import org.apache.spark.streaming.dstream.ConstantInputDStream
val cis = new ConstantInputDStream(ssc, rdd)
// Sample stream computation
cis.print
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, createSC)
// Start streaming processing
ssc.start
DStreamCheckpointData
works with a single dstream. An instance of DStreamCheckpointData
is created when a dstream is.
It tracks checkpoint data in the internal data
registry that records batch time and the checkpoint data at that time. The internal checkpoint data can be anything that a dstream wants to checkpoint. DStreamCheckpointData
returns the registry when currentCheckpointFiles
method is called.
Note
|
By default, DStreamCheckpointData records the checkpoint files to which the generated RDDs of the DStream has been saved.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
update(time: Time): Unit
update
collects batches and the directory names where the corresponding RDDs were checkpointed (filtering the dstream’s internal generatedRDDs mapping).
You should see the following DEBUG message in the logs:
DEBUG Current checkpoint files:
[checkpointFile per line]
The collection of the batches and their checkpointed RDDs is recorded in an internal field for serialization (i.e. it becomes the current value of the internal field currentCheckpointFiles
that is serialized when requested).
The collection is also added to an internal transient (non-serializable) mapping timeToCheckpointFile
and the oldest checkpoint (given batch times) is recorded in an internal transient mapping for the current time
.
Note
|
It is called by DStream.updateCheckpointData(currentTime: Time). |
cleanup(time: Time): Unit
cleanup
deletes checkpoint files older than the oldest batch for the input time
.
It first gets the oldest batch time for the input time
(see Updating Collection of Batches and Checkpoint Directories (update method)).
If the (batch) time has been found, all the checkpoint files older are deleted (as tracked in the internal timeToCheckpointFile
mapping).
You should see the following DEBUG message in the logs:
DEBUG Files to delete:
[comma-separated files to delete]
For each checkpoint file successfully deleted, you should see the following INFO message in the logs:
INFO Deleted checkpoint file '[file]' for time [time]
Errors in checkpoint deletion are reported as WARN messages in the logs:
WARN Error deleting old checkpoint file '[file]' for time [time]
Otherwise, when no (batch) time has been found for the given input time
, you should see the following DEBUG message in the logs:
DEBUG Nothing to delete
Note
|
It is called by DStream.clearCheckpointData(time: Time). |
restore(): Unit
restore
restores the dstream’s generatedRDDs given persistent internal data
mapping with batch times and corresponding checkpoint files.
restore
takes the current checkpoint files and restores checkpointed RDDs from each checkpoint file (using SparkContext.checkpointFile
).
You should see the following INFO message in the logs per checkpoint file:
INFO Restoring checkpointed RDD for time [time] from file '[file]'
Note
|
It is called by DStream.restoreCheckpointData(). |
Checkpoint
class requires a StreamingContext and checkpointTime
time to be instantiated. The internal property checkpointTime
corresponds to the batch time it represents.
Note
|
Checkpoint class is written to a persistent storage (aka serialized) using CheckpointWriter.write method and read back (aka deserialize) using Checkpoint.deserialize.
|
Note
|
Initial checkpoint is the checkpoint a StreamingContext was started with. |
It is merely a collection of the settings of the current streaming runtime environment that is supposed to recreate the environment after it goes down due to a failure or when the streaming context is stopped immediately.
It collects the settings from the input StreamingContext
(and indirectly from the corresponding JobScheduler and SparkContext):
-
The master URL from SparkContext as
master
. -
The mandatory application name from SparkContext as
framework
. -
The jars to distribute to workers from SparkContext as
jars
. -
The DStreamGraph as
graph
-
The checkpoint directory as
checkpointDir
-
The checkpoint interval as
checkpointDuration
-
The collection of pending batches to process as
pendingTimes
-
The Spark configuration (aka SparkConf) as
sparkConfPairs
Tip
|
Enable Add the following line to
Refer to Logging. |
serialize(checkpoint: Checkpoint, conf: SparkConf): Array[Byte]
serialize
serializes the checkpoint
object. It does so by creating a compression codec to write the input checkpoint
object with and returns the result as a collection of bytes.
Caution
|
FIXME Describe compression codecs in Spark. |
deserialize(inputStream: InputStream, conf: SparkConf): Checkpoint
deserialize
reconstructs a Checkpoint object from the input inputStream
. It uses a compression codec and once read the just-built Checkpoint object is validated and returned back.
Note
|
deserialize is called when reading the latest valid checkpoint file.
|
validate(): Unit
validate
validates the Checkpoint. It ensures that master
, framework
, graph
, and checkpointTime
are defined, i.e. not null
.
Note
|
validate is called when a checkpoint is deserialized from an input stream.
|
You should see the following INFO message in the logs when the object passes the validation:
INFO Checkpoint: Checkpoint for time [checkpointTime] ms validated
getCheckpointFiles(checkpointDir: String, fsOption: Option[FileSystem] = None): Seq[Path]
getCheckpointFiles
method returns a collection of checkpoint files from the given checkpoint directory checkpointDir
.
The method sorts the checkpoint files by time with a temporary .bk
checkpoint file first (given a pair of a checkpoint file and its backup file).
An instance of CheckpointWriter
is created (lazily) when JobGenerator
is, but only when JobGenerator is configured for checkpointing.
It uses the internal single-thread thread pool executor to execute checkpoint writes asynchronously and does so until it is stopped.
write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean): Unit
write
method serializes the checkpoint object and passes the serialized form to CheckpointWriteHandler to write asynchronously (i.e. on a separate thread) using single-thread thread pool executor.
Note
|
It is called when JobGenerator receives DoCheckpoint event and the batch time is eligible for checkpointing. |
You should see the following INFO message in the logs:
INFO CheckpointWriter: Submitted checkpoint of time [checkpoint.checkpointTime] ms writer queue
If the asynchronous checkpoint write fails, you should see the following ERROR in the logs:
ERROR Could not submit checkpoint task to the thread pool executor
stop(): Unit
CheckpointWriter
uses the internal stopped
flag to mark whether it is stopped or not.
Note
|
stopped flag is disabled, i.e. false , when CheckpointWriter is created.
|
stop
method checks the internal stopped
flag and returns if it says it is stopped already.
If not, it orderly shuts down the internal single-thread thread pool executor and awaits termination for 10 seconds. During that time, any asynchronous checkpoint writes can be safely finished, but no new tasks will be accepted.
Note
|
The wait time before executor stops is fixed, i.e. not configurable, and is set to 10 seconds.
|
After 10 seconds, when the thread pool did not terminate, stop
stops it forcefully.
You should see the following INFO message in the logs:
INFO CheckpointWriter: CheckpointWriter executor terminated? [terminated], waited for [time] ms.
CheckpointWriter
is marked as stopped, i.e. stopped
flag is set to true
.
executor
is an internal single-thread thread pool executor for executing asynchronous checkpoint writes using CheckpointWriteHandler.
It shuts down when CheckpointWriter is stopped (with a 10-second graceful period before it terminated forcefully).
CheckpointWriteHandler
is an (internal) thread of execution that does checkpoint writes. It is instantiated with checkpointTime
, the serialized form of the checkpoint, and whether or not to clean checkpoint data later flag (as clearCheckpointDataLater
).
Note
|
It is only used by CheckpointWriter to queue a checkpoint write for a batch time. |
It records the current checkpoint time (in latestCheckpointTime
) and calculates the name of the checkpoint file.
Note
|
The name of the checkpoint file is checkpoint-[checkpointTime.milliseconds] .
|
It uses a backup file to do atomic write, i.e. it writes to the checkpoint backup file first and renames the result file to the final checkpoint file name.
Note
|
The name of the checkpoint backup file is checkpoint-[checkpointTime.milliseconds].bk .
|
Note
|
CheckpointWriteHandler does 3 write attempts at the maximum. The value is not configurable.
|
When attempting to write, you should see the following INFO message in the logs:
INFO CheckpointWriter: Saving checkpoint for time [checkpointTime] ms to file '[checkpointFile]'
Note
|
It deletes any checkpoint backup files that may exist from the previous attempts. |
It then deletes checkpoint files when there are more than 10.
Note
|
The number of checkpoint files when the deletion happens, i.e. 10, is fixed and not configurable. |
You should see the following INFO message in the logs:
INFO CheckpointWriter: Deleting [file]
If all went fine, you should see the following INFO message in the logs:
INFO CheckpointWriter: Checkpoint for time [checkpointTime] ms saved to file '[checkpointFile]', took [bytes] bytes and [time] ms
JobGenerator is informed that the checkpoint write completed (with checkpointTime
and clearCheckpointDataLater
flag).
In case of write failures, you can see the following WARN message in the logs:
WARN CheckpointWriter: Error in attempt [attempts] of writing checkpoint to [checkpointFile]
If the number of write attempts exceeded (the fixed) 10 or CheckpointWriter was stopped before any successful checkpoint write, you should see the following WARN message in the logs:
WARN CheckpointWriter: Could not write checkpoint for time [checkpointTime] to file [checkpointFile]'
CheckpointReader
is a private[streaming]
helper class to read the latest valid checkpoint file to recreate StreamingContext from (given the checkpoint directory).
read(checkpointDir: String): Option[Checkpoint]
read(checkpointDir: String, conf: SparkConf,
hadoopConf: Configuration, ignoreReadError: Boolean = false): Option[Checkpoint]
read
methods read the latest valid checkpoint file from the checkpoint directory checkpointDir
. They differ in whether Spark configuration conf
and Hadoop configuration hadoopConf
are given or created in place.
Note
|
The 4-parameter read method is used by StreamingContext to recreate itself from a checkpoint file.
|
The first read
throws no SparkException
when no checkpoint file could be read.
Note
|
It appears that no part of Spark Streaming uses the simplified version of read .
|
read
uses Apache Hadoop’s Path and Configuration to get the checkpoint files (using Checkpoint.getCheckpointFiles) in reverse order.
If there is no checkpoint file in the checkpoint directory, it returns None.
You should see the following INFO message in the logs:
INFO CheckpointReader: Checkpoint files found: [checkpointFiles]
The method reads all the checkpoints (from the youngest to the oldest) until one is successfully loaded, i.e. deserialized.
You should see the following INFO message in the logs just before deserializing a checkpoint file
:
INFO CheckpointReader: Attempting to load checkpoint from file [file]
If the checkpoint file was loaded, you should see the following INFO messages in the logs:
INFO CheckpointReader: Checkpoint successfully loaded from file [file]
INFO CheckpointReader: Checkpoint was generated at time [checkpointTime]
In case of any issues while loading a checkpoint file, you should see the following WARN in the logs and the corresponding exception:
WARN CheckpointReader: Error reading checkpoint from file [file]
Unless ignoreReadError
flag is disabled, when no checkpoint file could be read, SparkException
is thrown with the following message:
Failed to read checkpoint from directory [checkpointPath]
None
is returned at this point and the method finishes.