foreachBatch is the secret weapon for integrating Spark Streaming with arbitrary stateful sinks.
Let’s see it in action. Imagine we’re processing a stream of user click events and want to update a real-time leaderboard.
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val schema = StructType(Seq(
StructField("userId", StringType),
StructField("timestamp", TimestampType),
StructField("page", StringType)
))
val clickStream = spark.readStream
.format("rate")
.option("rowsPerSecond", 100)
.schema(schema)
.load()
// A dummy Kafka producer for demonstration. In a real scenario, this would be your actual sink logic.
def writeToLeaderboard(batchDF: org.apache.spark.sql.DataFrame, batchId: Long): Unit = {
println(s"Processing batch $batchId")
if (!batchDF.isEmpty) {
// Aggregate clicks per user in this batch
val userCounts = batchDF.groupBy("userId").agg(count("*").as("clicks_in_batch"))
// In a real sink, you'd connect to your database/cache here
// and update the user scores based on userCounts.
// For demonstration, we'll just show the aggregated counts.
userCounts.show(truncate = false)
} else {
println(s"Batch $batchId is empty.")
}
}
val query: StreamingQuery = clickStream
.withWatermark("timestamp", "10 minutes") // Essential for stateful operations
.writeStream
.foreachBatch(writeToLeaderboard _) // Pass our function here
.option("checkpointLocation", "/tmp/spark-checkpoints/leaderboard") // Crucial for fault tolerance
.trigger(Trigger.ProcessingTime("5 seconds")) // Process micro-batches every 5 seconds
.start()
query.awaitTermination()
The problem foreachBatch solves is how to write streaming data to a sink that isn’t natively supported by Spark Structured Streaming’s writeStream (like Kafka, Kinesis, or just the console). It allows you to execute arbitrary code on each micro-batch of streaming data, giving you complete control over the output.
Internally, Spark processes the incoming data stream and groups it into micro-batches. For each micro-batch, Spark creates a DataFrame representing the data that arrived within that micro-batch’s time window. The foreachBatch function receives this micro-batch DataFrame and the unique batchId as arguments. Your provided function then operates on this DataFrame to perform the desired write operation. This is fundamentally a batch processing operation happening repeatedly on incoming stream data.
The key levers you control are:
- The function itself: This is where you write your sink logic. You can connect to databases, call external APIs, update caches, or perform any other side effect.
checkpointLocation: This is non-negotiable for fault tolerance. Spark uses this directory to store metadata about processed batches, offsets, and state. If your job restarts, it can resume from the last checkpointed state, preventing data loss or duplicate processing.trigger: This defines how often Spark will process new data.Trigger.ProcessingTime("5 seconds")means Spark will attempt to process any available data every 5 seconds, creating a new micro-batch.withWatermark: Essential when your sink logic involves stateful operations (like aggregations or joins across batches) or when you need to handle late-arriving data. It tells Spark how much time it should wait for late data before considering a window complete.
The batchId is not just an arbitrary number; it’s a monotonically increasing identifier for each micro-batch. This is incredibly useful for ensuring exactly-once processing semantics with stateful sinks. You can use it to deduplicate writes if your sink is idempotent, or to track progress and reprocess specific batches if an error occurs.
The most surprising thing about foreachBatch is that it allows you to execute arbitrary, non-streaming code within a streaming job, yet maintain the core guarantees of Structured Streaming like fault tolerance and exactly-once processing (when combined with checkpointing and an idempotent sink). You’re essentially treating each micro-batch as a small, independent batch job.
The next hurdle you’ll likely face is managing the state of your sink across restarts, especially if your sink itself isn’t inherently idempotent.