A Spark Streaming window operation doesn’t actually "process" data in windows; it groups and aggregates data based on time intervals.
Let’s see this in action. Imagine we have a stream of click events, each with a timestamp. We want to count how many clicks happen in 10-minute intervals.
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
// Assume df is your DataFrame with a 'timestamp' column (TimestampType)
// and potentially other data.
// Example schema:
// val schema = StructType(Seq(
// StructField("userId", StringType, true),
// StructField("timestamp", TimestampType, false)
// ))
// val df = spark.readStream.schema(schema).json("/path/to/clickstream/data")
// For demonstration, let's create a dummy stream
val clickEvents = spark.range(1000).toDF("eventId")
.withColumn("timestamp", current_timestamp() - (rand() * 600).cast(IntegerType).cast(TimestampType))
val streamingClicks = clickEvents
.withWatermark("timestamp", "10 minutes") // Crucial for late data handling
.groupBy(window($"timestamp", "10 minutes"))
.count()
val query: StreamingQuery = streamingClicks
.writeStream
.outputMode("append") // or "complete" depending on your aggregation
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds")) // How often Spark checks for new data
.start()
query.awaitTermination()
This code defines a stream of click events. We then apply withWatermark which is essential for Spark to know how much late data it should still consider for a given window. The groupBy(window($"timestamp", "10 minutes")) is the core of the tumbling window. It groups all records whose timestamp falls within a non-overlapping 10-minute interval. For each of these 10-minute windows, we count() the number of records. The trigger dictates how often Spark processes new data that has arrived.
The fundamental problem Spark Streaming windows solve is enabling stateful computations over unbounded data streams. Without windows, you’d need to maintain a continuous, growing state to aggregate data across all time, which is infeasible. Windows break this down into manageable, time-bound chunks.
Internally, Spark maintains the state for each active window. When data arrives, it’s assigned to its relevant window(s). Spark tracks the "current time" of the stream (often dictated by the watermark) and when a window’s end time is sufficiently "behind" the current time, and no more late data is expected (thanks to the watermark), Spark can finalize that window’s aggregation and output it.
Tumbling windows are fixed-size, non-overlapping intervals. If you have a 10-minute tumbling window, it will process data from 0:00-0:10, then 0:10-0:20, then 0:20-0:30, and so on. Each record belongs to exactly one window.
Sliding windows, on the other hand, overlap. If you define a 10-minute sliding window with a slide interval of 5 minutes, Spark will compute aggregations for the intervals 0:00-0:10, 0:05-0:15, 0:10-0:20, etc. This is useful for looking at metrics over a rolling period, like "the last 10 minutes of activity" as time progresses.
// Example of a sliding window
val slidingWindowClicks = clickEvents
.withWatermark("timestamp", "10 minutes")
.groupBy(window($"timestamp", "10 minutes", "5 minutes")) // 10 min window, slides every 5 min
.count()
// ... then writeStream as before ...
The key difference in the code is the window($"timestamp", "10 minutes", "5 minutes") syntax. The first argument is the window duration, and the second is the slide interval.
One crucial, often misunderstood, aspect of windowing is how the watermark interacts with late data and window completion. The watermark defines a threshold for how late data is tolerated. When the event time of a record is more than the watermark delay behind the current processing time and more than the window duration behind the current processing time, Spark can safely drop data for that window because it’s unlikely to receive any more records for it. The window’s aggregated result is then emitted. If you don’t set a watermark or set it too small, Spark might hold onto state indefinitely, or emit incomplete windows if late data arrives after a window has been "closed" by the processing time.
The next concept you’ll encounter is how to handle complex aggregations within these windows, such as agg() with multiple functions, or using arbitrary stateful operations with flatMapGroupsWithState.