Spark Streaming’s checkpointing is how it remembers its state between batch intervals, allowing for exactly-once processing. But over time, your checkpoint directory can become a cluttered mess of old, unnecessary files, leading to slower startup times and potential storage issues. This isn’t just about disk space; it’s about the performance overhead of Spark having to sift through mountains of old state.
Let’s see checkpointing in action. Imagine a simple Spark Streaming job that counts words from a Kafka topic:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
val spark = SparkSession.builder()
.appName("WordCountCheckpoint")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input-topic")
.load()
val words = lines.selectExpr("CAST(value AS STRING)")
.as[String]
.flatMap(_.split(" "))
.map(word => (word, 1))
.toDF("word", "count")
val wordCounts = words.groupBy("word").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", "/tmp/spark-checkpoints/wordcount")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
When this job runs, Spark writes checkpoint data to /tmp/spark-checkpoints/wordcount. This includes metadata about the streaming query’s progress (like the last processed offset from Kafka) and, for stateful operations like groupBy, the actual aggregated state.
Initially, this directory is small. But as the streaming job runs for hours, days, or weeks, it accumulates many subdirectories, each representing a batch or a micro-batch. You’ll see directories like 0, 1, 2, and so on, and within those, files like offsets, state, and metadata.
The problem arises because Spark Streaming, by default, doesn’t automatically clean up old checkpoint data. This means your checkpoint directory can grow enormous, containing state that is no longer relevant for recovery. If your streaming application crashes and needs to restart, Spark will still try to load state from these old checkpoints, significantly increasing startup latency.
The Mental Model: State, Offsets, and Recovery
Think of checkpointing as Spark’s "save game" feature. For stateless operations (like just reading from Kafka and printing), Spark only needs to remember the last offset it read from each Kafka partition. This is stored in the offsets file within a checkpoint directory.
For stateful operations (like groupBy().count()), Spark needs to store the actual aggregated state. This is typically found in files named state within the checkpoint subdirectories. When Spark restarts, it reads the latest complete checkpoint, which contains both the latest offsets and the current aggregated state, allowing it to resume processing exactly where it left off.
The checkpointLocation is the root directory where all this information is stored. Spark creates subdirectories (e.g., 0, 1, 2, …) for each completed batch (or micro-batch). Each of these subdirectories contains the state and offsets for that specific point in time.
The key is that Spark always looks for the highest numbered subdirectory to recover from. Older subdirectories are technically not needed for recovery unless you’re trying to rewind to a specific past state for debugging, which is rarely the case for production recovery.
The Hidden Cost: Compaction Lag
When you have thousands of checkpoint subdirectories, even if Spark only reads the latest one, the file system operations to discover that latest one can become slow. More importantly, if you do need to recover from an older state (e.g., if the latest checkpoint is corrupted), Spark has to process all the intermediate state updates, which can be prohibitively slow.
Compaction Strategy: Manual Cleanup
Spark Streaming itself doesn’t have a built-in, automatic "garbage collection" for checkpoints. The primary mechanism to manage the checkpoint directory is manual cleanup. You need to periodically delete old checkpoint subdirectories.
The Lever: You are the GC. You decide when and what to delete.
How to do it:
-
Identify the Latest Checkpoint: When your streaming application is running, Spark writes to the highest numbered subdirectory. You can manually inspect the checkpoint directory (
/tmp/spark-checkpoints/wordcountin our example) to find the subdirectory with the largest integer name. For instance, if you see0,1,2,...,999,1000, the latest is1000. -
Delete Old Subdirectories: You can use standard shell commands to delete directories that are older than a certain number of batches or a specific age. Crucially, never delete the latest checkpoint directory while the application is running.
For example, to delete all directories older than 500 batches (assuming your job runs continuously and subdirectories are created sequentially):
# Example: Delete directories with numbers less than 500 # Make sure you have a backup or are absolutely sure before running this! find /tmp/spark-checkpoints/wordcount/ -maxdepth 1 -type d -name "[0-9]*" | sort -V | head -n -500 | xargs rm -rfThis command:
find ... -maxdepth 1 -type d -name "[0-9]*": Finds all directories directly under the checkpoint location that are named with numbers.sort -V: Sorts these directories numerically (version sort).head -n -500: Excludes the last 500 directories from the sorted list (i.e., keeps the 500 most recent).xargs rm -rf: Deletes the selected old directories.
Alternatively, you can delete based on age:
# Example: Delete directories older than 7 days (requires mtime of the directory itself) # This is less reliable than counting batches as it depends on file system timestamps. find /tmp/spark-checkpoints/wordcount/ -maxdepth 1 -type d -mtime +7 -name "[0-9]*" -exec rm -rf {} \; -
Automate the Cleanup: This manual process should be automated. You can set up a cron job or a scheduled task that runs these
findandrmcommands periodically (e.g., daily or weekly).
The most surprising true thing about Spark Streaming checkpoint compaction is that Spark provides no built-in mechanism for it, leaving this critical operational task entirely to the user. This means that while Spark handles the complexity of fault tolerance, it delegates the responsibility of managing the resulting state’s footprint and retrieval performance to you.
The next problem you’ll encounter is managing the size of the state itself for stateful operations, which often requires understanding mapGroupsWithState or flatMapGroupsWithState and potential state store backends.